1use std::{2 collections::HashMap,3 ffi::OsStr,4 process::Stdio,5 sync::{Arc, Mutex},6 task::Poll,7};89use anyhow::Result;10use futures::StreamExt;11use serde::{de::Visitor, Deserialize};12use tokio::{io::AsyncRead, process::Command, select};13use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};14use tracing::{info, info_span, warn, Span};15use tracing_indicatif::span_ext::IndicatifSpanExt;1617fn escape_bash(input: &str, out: &mut String) {18 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";19 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {20 out.push_str(input);21 return;22 }23 out.push('\'');24 for (i, v) in input.split('\'').enumerate() {25 if i != 0 {26 out.push_str("'\"'\"'");27 }28 out.push_str(v);29 }30 out.push('\'');31}32fn ostoutf8(os: impl AsRef<OsStr>) -> String {33 os.as_ref().to_str().expect("non-utf8 data").to_owned()34}35#[derive(Clone)]36pub struct MyCommand {37 command: String,38 args: Vec<String>,39 env: Vec<(String, String)>,40}41impl MyCommand {42 pub fn new(cmd: impl AsRef<OsStr>) -> Self {43 assert!(!cmd.as_ref().is_empty());44 Self {45 command: ostoutf8(cmd),46 args: vec![],47 env: vec![],48 }49 }50 fn into_args(self) -> Vec<String> {51 let mut out = Vec::new();52 if !self.env.is_empty() {53 out.push("env".to_owned());54 for (k, v) in self.env {55 assert!(!k.contains('='));56 out.push(format!("{k}={v}"));57 }58 }59 out.push(self.command);60 out.extend(self.args);61 out62 }63 fn into_string(self) -> String {64 let mut out = String::new();65 if !self.env.is_empty() {66 out.push_str("env");67 for (k, v) in self.env {68 out.push(' ');69 assert!(!k.contains('='));70 escape_bash(&k, &mut out);71 out.push('=');72 escape_bash(&v, &mut out);73 }74 }75 if !out.is_empty() {76 out.push(' ');77 }78 escape_bash(&self.command, &mut out);79 for arg in self.args {80 out.push(' ');81 escape_bash(&arg, &mut out);82 }83 out84 }85 fn into_command(self) -> Command {86 let mut out = Command::new(self.command);87 out.args(self.args);88 for (k, v) in self.env {89 out.env(k, v);90 }91 out92 }93 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {94 let arg = arg.as_ref();95 self.args.push(ostoutf8(arg));96 self97 }98 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {99 let arg = arg.as_ref();100 let value = value.as_ref();101 let arg = ostoutf8(arg);102 let value = ostoutf8(value);103 self.arg(format!("{arg}={value}"));104 self105 }106 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {107 self.arg(arg);108 self.arg(value);109 self110 }111 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {112 for arg in args.into_iter() {113 let arg = arg.as_ref();114 self.args.push(ostoutf8(arg));115 }116 self117 }118 pub fn sudo(self) -> Self {119 let mut out = Self::new("sudo");120 out.args(self.into_args());121 out122 }123 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {124 let mut out = Self::new("ssh");125 out.arg(on).arg("--");126 out.arg(self.into_string());127 out128 }129130 pub async fn run(self) -> Result<()> {131 let str = self.clone().into_string();132 let cmd = self.into_command();133 run_nix_inner(str, cmd, &mut PlainHandler).await?;134 Ok(())135 }136 pub async fn run_string(self) -> Result<String> {137 let str = self.clone().into_string();138 let cmd = self.into_command();139 let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;140 Ok(v)141 }142143 pub async fn run_nix_string(self) -> Result<String> {144 let str = self.clone().into_string();145 let mut cmd = self.into_command();146 cmd.arg("--log-format").arg("internal-json");147 run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await148 }149 pub async fn run_nix(self) -> Result<()> {150 let str = self.clone().into_string();151 let mut cmd = self.into_command();152 cmd.arg("--log-format").arg("internal-json");153 cmd.stdout(Stdio::inherit());154 run_nix_inner(str, cmd, &mut NixHandler::default()).await155 }156}157158struct EmptyAsyncRead;159impl AsyncRead for EmptyAsyncRead {160 fn poll_read(161 self: std::pin::Pin<&mut Self>,162 _cx: &mut std::task::Context<'_>,163 _buf: &mut tokio::io::ReadBuf<'_>,164 ) -> Poll<std::io::Result<()>> {165 Poll::Pending166 }167}168169async fn run_nix_inner_stdout(170 str: String,171 cmd: Command,172 handler: &mut dyn Handler,173) -> Result<String> {174 Ok(run_nix_inner_raw(str, cmd, true, handler, None)175 .await?176 .expect("has out"))177}178async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {179 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;180 assert!(v.is_none());181 Ok(())182}183184pub trait Handler: Send {185 fn handle_line(&mut self, e: &str);186}187188pub struct ClonableHandler<H>(Arc<Mutex<H>>);189impl<H> Clone for ClonableHandler<H> {190 fn clone(&self) -> Self {191 Self(self.0.clone())192 }193}194impl<H> ClonableHandler<H> {195 pub fn new(inner: H) -> Self {196 Self(Arc::new(Mutex::new(inner)))197 }198}199impl<H: Handler> Handler for ClonableHandler<H> {200 fn handle_line(&mut self, e: &str) {201 self.0.lock().unwrap().handle_line(e)202 }203}204205struct PlainHandler;206impl Handler for PlainHandler {207 fn handle_line(&mut self, e: &str) {208 info!(target: "log", "{e}");209 }210}211212pub struct NoopHandler;213impl Handler for NoopHandler {214 fn handle_line(&mut self, _e: &str) {}215}216217#[derive(Default)]218pub struct NixHandler {219 spans: HashMap<u64, Span>,220}221impl Handler for NixHandler {222 fn handle_line(&mut self, e: &str) {223 if let Some(e) = e.strip_prefix("@nix ") {224 let log: NixLog = match serde_json::from_str(e) {225 Ok(l) => l,226 Err(err) => {227 warn!("failed to parse nix log line {:?}: {}", e, err);228 return;229 }230 };231 match log {232 NixLog::Msg { msg, raw_msg, .. } => {233 #[allow(clippy::nonminimal_bool)]234 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))235 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")236 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {237 if let Some(raw_msg) = raw_msg {238 if !msg.is_empty() {239 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())240 } else {241 info!(target: "nix", "{}", raw_msg.trim_end())242 }243 } else {244 info!(target: "nix", "{}", msg.trim_end())245 }246 }247 }248 NixLog::Start {249 ref fields,250 typ,251 id,252 ..253 } if typ == 105 && !fields.is_empty() => {254 if let [LogField::String(drv), ..] = &fields[..] {255 let mut drv = drv.as_str();256 if let Some(pkg) = drv.strip_prefix("/nix/store/") {257 let mut it = pkg.splitn(2, '-');258 it.next();259 if let Some(pkg) = it.next() {260 drv = pkg;261 }262 }263 info!(target: "nix","building {}", drv);264 let span = info_span!("build", drv);265 span.pb_start();266 self.spans.insert(id, span);267 } else {268 warn!("bad build log: {:?}", log)269 }270 }271 NixLog::Start {272 ref fields,273 typ,274 id,275 ..276 } if typ == 100 && fields.len() >= 3 => {277 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =278 &fields[..]279 {280 let mut drv = drv.as_str();281282 if let Some(pkg) = drv.strip_prefix("/nix/store/") {283 let mut it = pkg.splitn(2, '-');284 it.next();285 if let Some(pkg) = it.next() {286 drv = pkg;287 }288 }289 290 let span = info_span!("copy", from, to, drv);291 span.pb_start();292 self.spans.insert(id, span);293 } else {294 warn!("bad copy log: {:?}", log)295 }296 }297 NixLog::Start { text, typ, id, .. }298 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>299 {300 if !text.is_empty()301 && text != "querying info about missing paths"302 && text != "copying 0 paths"303 {304 let span = info_span!("job");305 span.pb_start();306 span.pb_set_message(text.trim());307 self.spans.insert(id, span);308 info!(target: "nix", "{}", text);309 }310 }311 NixLog::Start {312 text,313 level: 0,314 typ: 108,315 ..316 } if text.is_empty() => {317 318 }319 NixLog::Start {320 text,321 level: 4,322 typ: 109,323 ..324 } if text.starts_with("querying info about ") => {325 326 }327 NixLog::Start {328 text,329 level: 4,330 typ: 101,331 ..332 } if text.starts_with("downloading ") => {333 334 }335 NixLog::Start {336 text,337 level: 1,338 typ: 111,339 ..340 } if text.starts_with("waiting for a machine to build ") => {341 342 }343 NixLog::Start {344 text,345 level: 3,346 typ: 111,347 ..348 } if text.starts_with("resolved derivation: ") => {349 350 }351 NixLog::Start {352 text,353 level: 1,354 typ: 111,355 id,356 ..357 } if text.starts_with("waiting for lock on ") => {358 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();359 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {360 drv = txt;361 }362 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {363 drv = txt;364 }365 if let Some(txt) = drv.split("', '").next() {366 drv = txt;367 }368 if let Some(pkg) = drv.strip_prefix("/nix/store/") {369 let mut it = pkg.splitn(2, '-');370 it.next();371 if let Some(pkg) = it.next() {372 drv = pkg;373 }374 }375 let span = info_span!("waiting on drv", drv);376 span.pb_start();377 self.spans.insert(id, span);378 379 }380 NixLog::Stop { id, .. } => {381 self.spans.remove(&id);382 }383 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {384 if let Some(span) = self.spans.get(&id) {385 if let LogField::String(s) = &fields[0] {386 span.pb_set_message(s.trim());387 } else {388 warn!("bad fields: {fields:?}");389 }390 } else {391 warn!("unknown result id: {id} {typ} {fields:?}");392 }393 394 }395 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {396 if let Some(span) = self.spans.get(&id) {397 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =398 &fields[..4]399 {400 span.pb_set_length(*expected);401 span.pb_set_position(*done);402 } else {403 warn!("bad fields: {fields:?}");404 }405 } else {406 407 408 }409 410 }411 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {412 413 }414 _ => warn!("unknown log: {:?}", log),415 };416 } else {417 let e = e.trim();418 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {419 return;420 }421 info!("{e}")422 }423 }424}425426async fn run_nix_inner_raw(427 str: String,428 mut cmd: Command,429 want_stdout: bool,430 err_handler: &mut dyn Handler,431 mut out_handler: Option<&mut dyn Handler>,432) -> Result<Option<String>> {433 cmd.stderr(Stdio::piped());434 cmd.stdout(Stdio::piped());435 let mut child = cmd.spawn()?;436 let mut stderr = child.stderr.take().unwrap();437 let stdout = child.stdout.take().unwrap();438 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());439 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));440 let mut ob = want_stdout441 .then(|| out.take().unwrap())442 .unwrap_or_else(|| Box::new(EmptyAsyncRead));443 let mut ol = (!want_stdout)444 .then(|| out.take().unwrap())445 .unwrap_or_else(|| Box::new(EmptyAsyncRead));446 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());447 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());448449 450451 let mut out_buf = if want_stdout { Some(vec![]) } else { None };452 loop {453 select! {454 e = err.next() => {455 if let Some(e) = e {456 let e = e?;457 err_handler.handle_line(&e);458 }459 },460 o = ob.next() => {461 if let Some(o) = o {462 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);463 }464 },465 o = ol.next() => {466 if let Some(o) = o {467 let o = o?;468 if let Some(out) = out_handler.as_mut() {469 out.handle_line(&o)470 } else {471 err_handler.handle_line(&o)472 }473 474 }475 },476 code = child.wait() => {477 let code = code?;478 if !code.success() {479 anyhow::bail!("command '{str}' failed with status {}", code);480 }481 break;482 }483 }484 }485486 Ok(out_buf.map(String::from_utf8).transpose()?)487}488489pub trait ErrorRecorder: Send {490 491 fn push_message(&mut self, msg: &str) -> bool;492}493494#[derive(Debug)]495enum LogField {496 String(String),497 Num(u64),498}499500impl<'de> Deserialize<'de> for LogField {501 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>502 where503 D: serde::Deserializer<'de>,504 {505 struct StringOrNum;506 impl<'de> Visitor<'de> for StringOrNum {507 type Value = LogField;508509 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {510 write!(f, "string or unsigned")511 }512513 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>514 where515 E: serde::de::Error,516 {517 Ok(LogField::String(v.to_owned()))518 }519520 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>521 where522 E: serde::de::Error,523 {524 Ok(LogField::Num(v))525 }526 }527528 deserializer.deserialize_any(StringOrNum)529 }530}531532#[derive(Deserialize, Debug)]533#[serde(rename_all = "camelCase", tag = "action")]534#[allow(dead_code)]535enum NixLog {536 Msg {537 level: u32,538 msg: String,539 raw_msg: Option<String>,540 },541 Start {542 id: u64,543 level: u32,544 #[serde(default)]545 fields: Vec<LogField>,546 text: String,547 #[serde(rename = "type")]548 typ: u32,549 },550 Stop {551 id: u64,552 },553 Result {554 id: u64,555 #[serde(rename = "type")]556 typ: u32,557 #[serde(default)]558 fields: Vec<LogField>,559 },560}