1use std::{2 borrow::Cow,3 collections::HashMap,4 ffi::OsStr,5 process::Stdio,6 sync::{Arc, Mutex},7 task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25 out.push_str(input);26 return;27 }28 out.push('\'');29 for (i, v) in input.split('\'').enumerate() {30 if i != 0 {31 out.push_str("'\"'\"'");32 }33 out.push_str(v);34 }35 out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38 os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42 command: String,43 args: Vec<String>,44 env: Vec<(String, String)>,45 ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48 pub fn new(cmd: impl AsRef<OsStr>) -> Self {49 assert!(!cmd.as_ref().is_empty());50 Self {51 command: ostoutf8(cmd),52 args: vec![],53 env: vec![],54 ssh_session: None,55 }56 }57 fn into_args(self) -> Vec<String> {58 let mut out = Vec::new();59 if !self.env.is_empty() {60 out.push("env".to_owned());61 for (k, v) in self.env {62 assert!(!k.contains('='));63 out.push(format!("{k}={v}"));64 }65 }66 out.push(self.command);67 out.extend(self.args);68 out69 }70 fn into_string(self) -> String {71 let mut out = String::new();72 if !self.env.is_empty() {73 out.push_str("env");74 for (k, v) in self.env {75 out.push(' ');76 assert!(!k.contains('='));77 escape_bash(&k, &mut out);78 out.push('=');79 escape_bash(&v, &mut out);80 }81 }82 if !out.is_empty() {83 out.push(' ');84 }85 escape_bash(&self.command, &mut out);86 for arg in self.args {87 out.push(' ');88 escape_bash(&arg, &mut out);89 }90 out91 }92 fn into_command(self) -> Command {93 let mut out = Command::new(self.command);94 out.args(self.args);95 for (k, v) in self.env {96 out.env(k, v);97 }98 out99 }100 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {101 Ok(if let Some(session) = self.ssh_session.clone() {102 let cmd = self.into_command();103 Either::Right(104 cmd.over_ssh(session)105 .map_err(|e| anyhow!("ssh error: {e}"))?,106 )107 } else {108 let cmd = self.into_command();109 Either::Left(cmd)110 })111 }112 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {113 let arg = arg.as_ref();114 self.args.push(ostoutf8(arg));115 self116 }117 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {118 let arg = arg.as_ref();119 let value = value.as_ref();120 let arg = ostoutf8(arg);121 let value = ostoutf8(value);122 self.arg(format!("{arg}={value}"));123 self124 }125 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {126 self.arg(arg);127 self.arg(value);128 self129 }130 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {131 for arg in args.into_iter() {132 let arg = arg.as_ref();133 self.args.push(ostoutf8(arg));134 }135 self136 }137 pub fn sudo(self) -> Self {138 if std::env::var_os("NO_SUDO").is_some() {139 let mut out = Self::new("su");140 out.arg("-c").arg(self.into_string());141 out142 } else {143 let mut out = Self::new("sudo");144 out.args(self.into_args());145 out146 }147 }148 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {149 let mut out = Self::new("ssh");150 out.arg(on).arg("--");151 out.arg(self.into_string());152 out153 }154 pub fn over_ssh(mut self, session: Arc<Session>) -> Self {155 self.ssh_session = Some(session);156 self157 }158159 pub async fn run(self) -> Result<()> {160 let str = self.clone().into_string();161 let cmd = self.into_command();162 run_nix_inner(str, cmd, &mut PlainHandler).await?;163 Ok(())164 }165 pub async fn run_string(self) -> Result<String> {166 let str = self.clone().into_string();167 let cmd = self.into_command();168 let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;169 Ok(v)170 }171172 pub async fn run_nix_string(self) -> Result<String> {173 let str = self.clone().into_string();174 let mut cmd = self.into_command();175 cmd.arg("--log-format").arg("internal-json");176 run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await177 }178 pub async fn run_nix(self) -> Result<()> {179 let str = self.clone().into_string();180 let mut cmd = self.into_command();181 cmd.arg("--log-format").arg("internal-json");182 cmd.stdout(Stdio::inherit());183 run_nix_inner(str, cmd, &mut NixHandler::default()).await184 }185}186187struct EmptyAsyncRead;188impl AsyncRead for EmptyAsyncRead {189 fn poll_read(190 self: std::pin::Pin<&mut Self>,191 _cx: &mut std::task::Context<'_>,192 _buf: &mut tokio::io::ReadBuf<'_>,193 ) -> Poll<std::io::Result<()>> {194 Poll::Pending195 }196}197198async fn run_nix_inner_stdout(199 str: String,200 cmd: Command,201 handler: &mut dyn Handler,202) -> Result<String> {203 Ok(run_nix_inner_raw(str, cmd, true, handler, None)204 .await?205 .expect("has out"))206}207async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {208 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;209 assert!(v.is_none());210 Ok(())211}212213pub trait Handler: Send {214 fn handle_line(&mut self, e: &str);215}216217pub struct ClonableHandler<H>(Arc<Mutex<H>>);218impl<H> Clone for ClonableHandler<H> {219 fn clone(&self) -> Self {220 Self(self.0.clone())221 }222}223impl<H> ClonableHandler<H> {224 pub fn new(inner: H) -> Self {225 Self(Arc::new(Mutex::new(inner)))226 }227}228impl<H: Handler> Handler for ClonableHandler<H> {229 fn handle_line(&mut self, e: &str) {230 self.0.lock().unwrap().handle_line(e)231 }232}233234struct PlainHandler;235impl Handler for PlainHandler {236 fn handle_line(&mut self, e: &str) {237 info!(target: "log", "{e}");238 }239}240241pub struct NoopHandler;242impl Handler for NoopHandler {243 fn handle_line(&mut self, _e: &str) {}244}245246#[derive(Default)]247pub struct NixHandler {248 spans: HashMap<u64, Span>,249}250fn process_message(m: &str) -> Cow<'_, str> {251 static OSC_CLEANER: Lazy<Regex> =252 Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());253 OSC_CLEANER.replace_all(m, "")254}255impl Handler for NixHandler {256 fn handle_line(&mut self, e: &str) {257 if let Some(e) = e.strip_prefix("@nix ") {258 let log: NixLog = match serde_json::from_str(e) {259 Ok(l) => l,260 Err(err) => {261 warn!("failed to parse nix log line {:?}: {}", e, err);262 return;263 }264 };265 match log {266 NixLog::Msg { msg, raw_msg, .. } => {267 #[allow(clippy::nonminimal_bool)]268 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))269 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")270 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {271 if let Some(raw_msg) = raw_msg {272 if !msg.is_empty() {273 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())274 } else {275 info!(target: "nix", "{}", raw_msg.trim_end())276 }277 } else {278 info!(target: "nix", "{}", msg.trim_end())279 }280 }281 }282 NixLog::Start {283 ref fields,284 typ,285 id,286 ..287 } if typ == 105 && !fields.is_empty() => {288 if let [LogField::String(drv), ..] = &fields[..] {289 let mut drv = drv.as_str();290 if let Some(pkg) = drv.strip_prefix("/nix/store/") {291 let mut it = pkg.splitn(2, '-');292 it.next();293 if let Some(pkg) = it.next() {294 drv = pkg;295 }296 }297 info!(target: "nix","building {}", drv);298 let span = info_span!("build", drv);299 span.pb_start();300 self.spans.insert(id, span);301 } else {302 warn!("bad build log: {:?}", log)303 }304 }305 NixLog::Start {306 ref fields,307 typ,308 id,309 ..310 } if typ == 100 && fields.len() >= 3 => {311 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =312 &fields[..]313 {314 let mut drv = drv.as_str();315316 if let Some(pkg) = drv.strip_prefix("/nix/store/") {317 let mut it = pkg.splitn(2, '-');318 it.next();319 if let Some(pkg) = it.next() {320 drv = pkg;321 }322 }323 324 let span = info_span!("copy", from, to, drv);325 span.pb_start();326 self.spans.insert(id, span);327 } else {328 warn!("bad copy log: {:?}", log)329 }330 }331 NixLog::Start { text, typ, id, .. }332 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>333 {334 if !text.is_empty()335 && text != "querying info about missing paths"336 && text != "copying 0 paths"337 {338 let span = info_span!("job");339 span.pb_start();340 span.pb_set_message(&process_message(text.trim()));341 self.spans.insert(id, span);342 info!(target: "nix", "{}", text);343 }344 }345 NixLog::Start {346 text,347 level: 0,348 typ: 108,349 ..350 } if text.is_empty() => {351 352 }353 NixLog::Start {354 text,355 level: 4,356 typ: 109,357 ..358 } if text.starts_with("querying info about ") => {359 360 }361 NixLog::Start {362 text,363 level: 4,364 typ: 101,365 ..366 } if text.starts_with("downloading ") => {367 368 }369 NixLog::Start {370 text,371 level: 1,372 typ: 111,373 ..374 } if text.starts_with("waiting for a machine to build ") => {375 376 }377 NixLog::Start {378 text,379 level: 3,380 typ: 111,381 ..382 } if text.starts_with("resolved derivation: ") => {383 384 }385 NixLog::Start {386 text,387 level: 1,388 typ: 111,389 id,390 ..391 } if text.starts_with("waiting for lock on ") => {392 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();393 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {394 drv = txt;395 }396 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {397 drv = txt;398 }399 if let Some(txt) = drv.split("', '").next() {400 drv = txt;401 }402 if let Some(pkg) = drv.strip_prefix("/nix/store/") {403 let mut it = pkg.splitn(2, '-');404 it.next();405 if let Some(pkg) = it.next() {406 drv = pkg;407 }408 }409 let span = info_span!("waiting on drv", drv);410 span.pb_start();411 self.spans.insert(id, span);412 413 }414 NixLog::Stop { id, .. } => {415 self.spans.remove(&id);416 }417 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {418 if let Some(span) = self.spans.get(&id) {419 if let LogField::String(s) = &fields[0] {420 span.pb_set_message(&process_message(s.trim()));421 } else {422 warn!("bad fields: {fields:?}");423 }424 } else {425 warn!("unknown result id: {id} {typ} {fields:?}");426 }427 428 }429 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {430 if let Some(span) = self.spans.get(&id) {431 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =432 &fields[..4]433 {434 span.pb_set_length(*expected);435 span.pb_set_position(*done);436 } else {437 warn!("bad fields: {fields:?}");438 }439 } else {440 441 442 }443 444 }445 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {446 447 }448 _ => warn!("unknown log: {:?}", log),449 };450 } else {451 let e = e.trim();452 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {453 return;454 }455 info!("{e}")456 }457 }458}459460async fn run_nix_inner_raw(461 str: String,462 mut cmd: Command,463 want_stdout: bool,464 err_handler: &mut dyn Handler,465 mut out_handler: Option<&mut dyn Handler>,466) -> Result<Option<String>> {467 cmd.stderr(Stdio::piped());468 cmd.stdout(Stdio::piped());469 let mut child = cmd.spawn()?;470 let mut stderr = child.stderr.take().unwrap();471 let stdout = child.stdout.take().unwrap();472 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());473 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));474 let mut ob = want_stdout475 .then(|| out.take().unwrap())476 .unwrap_or_else(|| Box::new(EmptyAsyncRead));477 let mut ol = (!want_stdout)478 .then(|| out.take().unwrap())479 .unwrap_or_else(|| Box::new(EmptyAsyncRead));480 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());481 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());482483 484485 let mut out_buf = if want_stdout { Some(vec![]) } else { None };486 loop {487 select! {488 e = err.next() => {489 if let Some(e) = e {490 let e = e?;491 err_handler.handle_line(&e);492 }493 },494 o = ob.next() => {495 if let Some(o) = o {496 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);497 }498 },499 o = ol.next() => {500 if let Some(o) = o {501 let o = o?;502 if let Some(out) = out_handler.as_mut() {503 out.handle_line(&o)504 } else {505 err_handler.handle_line(&o)506 }507 508 }509 },510 code = child.wait() => {511 let code = code?;512 if !code.success() {513 anyhow::bail!("command '{str}' failed with status {}", code);514 }515 break;516 }517 }518 }519520 Ok(out_buf.map(String::from_utf8).transpose()?)521}522523pub trait ErrorRecorder: Send {524 525 fn push_message(&mut self, msg: &str) -> bool;526}527528#[derive(Debug)]529enum LogField {530 String(String),531 Num(u64),532}533534impl<'de> Deserialize<'de> for LogField {535 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>536 where537 D: serde::Deserializer<'de>,538 {539 struct StringOrNum;540 impl<'de> Visitor<'de> for StringOrNum {541 type Value = LogField;542543 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {544 write!(f, "string or unsigned")545 }546547 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>548 where549 E: serde::de::Error,550 {551 Ok(LogField::String(v.to_owned()))552 }553554 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>555 where556 E: serde::de::Error,557 {558 Ok(LogField::Num(v))559 }560 }561562 deserializer.deserialize_any(StringOrNum)563 }564}565566#[derive(Deserialize, Debug)]567#[serde(rename_all = "camelCase", tag = "action")]568#[allow(dead_code)]569enum NixLog {570 Msg {571 level: u32,572 msg: String,573 raw_msg: Option<String>,574 },575 Start {576 id: u64,577 level: u32,578 #[serde(default)]579 fields: Vec<LogField>,580 text: String,581 #[serde(rename = "type")]582 typ: u32,583 },584 Stop {585 id: u64,586 },587 Result {588 id: u64,589 #[serde(rename = "type")]590 typ: u32,591 #[serde(default)]592 fields: Vec<LogField>,593 },594}