1use std::{2 collections::HashMap,3 ffi::OsStr,4 pin,5 process::Stdio,6 sync::{Arc, Mutex},7 task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, OwningCommand, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25 out.push_str(input);26 return;27 }28 out.push('\'');29 for (i, v) in input.split('\'').enumerate() {30 if i != 0 {31 out.push_str("'\"'\"'");32 }33 out.push_str(v);34 }35 out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38 os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42 command: String,43 args: Vec<String>,44 env: Vec<(String, String)>,45 ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48 pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {49 assert!(!cmd.as_ref().is_empty());50 Self {51 command: ostoutf8(cmd),52 args: vec![],53 env: vec![],54 ssh_session: Some(session),55 }56 }57 pub fn new(cmd: impl AsRef<OsStr>) -> Self {58 assert!(!cmd.as_ref().is_empty());59 Self {60 command: ostoutf8(cmd),61 args: vec![],62 env: vec![],63 ssh_session: None,64 }65 }66 fn into_args(self) -> Vec<String> {67 let mut out = Vec::new();68 if !self.env.is_empty() {69 out.push("env".to_owned());70 for (k, v) in self.env {71 assert!(!k.contains('='));72 out.push(format!("{k}={v}"));73 }74 }75 out.push(self.command);76 out.extend(self.args);77 out78 }7980 81 82 83 84 85 fn translate_env_into_env(self) -> Self {86 if self.env.is_empty() {87 return self;88 }89 let mut out = Self::new("env");90 if let Some(session) = self.ssh_session {91 out = out.ssh_session(session);92 }93 for (k, v) in self.env {94 assert!(!k.contains('='));95 out.arg(format!("{k}={v}"));96 }97 out.arg(self.command);98 out.args(self.args);99100 out101 }102 fn into_string(self) -> String {103 let mut out = String::new();104 if !self.env.is_empty() {105 out.push_str("env");106 for (k, v) in self.env {107 out.push(' ');108 assert!(!k.contains('='));109 escape_bash(&k, &mut out);110 out.push('=');111 escape_bash(&v, &mut out);112 }113 }114 if !out.is_empty() {115 out.push(' ');116 }117 escape_bash(&self.command, &mut out);118 for arg in self.args {119 out.push(' ');120 escape_bash(&arg, &mut out);121 }122 out123 }124 fn into_command(self) -> Command {125 let mut out = Command::new(self.command);126 out.args(self.args);127 for (k, v) in self.env {128 out.env(k, v);129 }130 out131 }132 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {133 Ok(if let Some(session) = self.ssh_session.clone() {134 let cmd = self.translate_env_into_env().into_command();135 Either::Right(136 cmd.over_ssh(session)137 .map_err(|e| anyhow!("ssh error: {e}"))?,138 )139 } else {140 let cmd = self.into_command();141 Either::Left(cmd)142 })143 }144 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {145 let arg = arg.as_ref();146 self.args.push(ostoutf8(arg));147 self148 }149 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {150 let arg = arg.as_ref();151 let value = value.as_ref();152 let arg = ostoutf8(arg);153 let value = ostoutf8(value);154 self.arg(format!("{arg}={value}"));155 self156 }157 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158 self.arg(arg);159 self.arg(value);160 self161 }162 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {163 self.env164 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));165 self166 }167 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {168 for arg in args.into_iter() {169 let arg = arg.as_ref();170 self.args.push(ostoutf8(arg));171 }172 self173 }174 pub fn sudo(mut self) -> Self {175 if std::env::var_os("NO_SUDO").is_some() {176 let mut out = Self::new("su");177 out.ssh_session = self.ssh_session.take();178 out.arg("-c").arg(self.into_string());179 out180 } else {181 let mut out = Self::new("sudo");182 out.args(self.into_args());183 out184 }185 }186 pub fn ssh_session(mut self, on: Arc<Session>) -> Self {187 self.ssh_session = Some(on);188 self189 }190 pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {191 let mut out = Self::new("ssh");192 out.ssh_session = self.ssh_session.take();193 out.arg(on).arg("--");194 out.arg(self.into_string());195 out196 }197198 pub async fn run(self) -> Result<()> {199 let str = self.clone().into_string();200 let cmd = self.into_command_new()?;201 match cmd {202 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,203 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,204 };205 Ok(())206 }207 pub async fn run_string(self) -> Result<String> {208 let bytes = self.run_bytes().await?;209 Ok(String::from_utf8(bytes)?)210 }211 pub async fn run_bytes(self) -> Result<Vec<u8>> {212 let str = self.clone().into_string();213 let cmd = self.into_command_new()?;214 let v = match cmd {215 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,216 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,217 };218 Ok(v)219 }220221 pub async fn run_nix_string(self) -> Result<String> {222 let str = self.clone().into_string();223 let mut cmd = self.into_command();224 cmd.arg("--log-format").arg("internal-json");225 let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;226 Ok(String::from_utf8(bytes)?)227 }228 pub async fn run_nix(self) -> Result<()> {229 let str = self.clone().into_string();230 let mut cmd = self.into_command();231 cmd.arg("--log-format").arg("internal-json");232 cmd.stdout(Stdio::inherit());233 run_nix_inner(str, cmd, &mut NixHandler::default()).await234 }235}236237struct EmptyAsyncRead;238impl AsyncRead for EmptyAsyncRead {239 fn poll_read(240 self: std::pin::Pin<&mut Self>,241 _cx: &mut std::task::Context<'_>,242 _buf: &mut tokio::io::ReadBuf<'_>,243 ) -> Poll<std::io::Result<()>> {244 Poll::Pending245 }246}247248async fn run_nix_inner_stdout(249 str: String,250 cmd: Command,251 handler: &mut dyn Handler,252) -> Result<Vec<u8>> {253 Ok(run_nix_inner_raw(str, cmd, true, handler, None)254 .await?255 .expect("has out"))256}257async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {258 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;259 assert!(v.is_none());260 Ok(())261}262async fn run_nix_inner_stdout_ssh(263 str: String,264 cmd: OwningCommand<Arc<Session>>,265 handler: &mut dyn Handler,266) -> Result<Vec<u8>> {267 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)268 .await?269 .expect("has out"))270}271async fn run_nix_inner_ssh(272 str: String,273 cmd: OwningCommand<Arc<Session>>,274 handler: &mut dyn Handler,275) -> Result<()> {276 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;277 assert!(v.is_none());278 Ok(())279}280281pub trait Handler: Send {282 fn handle_line(&mut self, e: &str);283}284285pub struct ClonableHandler<H>(Arc<Mutex<H>>);286impl<H> Clone for ClonableHandler<H> {287 fn clone(&self) -> Self {288 Self(self.0.clone())289 }290}291impl<H> ClonableHandler<H> {292 pub fn new(inner: H) -> Self {293 Self(Arc::new(Mutex::new(inner)))294 }295}296impl<H: Handler> Handler for ClonableHandler<H> {297 fn handle_line(&mut self, e: &str) {298 self.0.lock().unwrap().handle_line(e)299 }300}301302struct PlainHandler;303impl Handler for PlainHandler {304 fn handle_line(&mut self, e: &str) {305 info!(target: "log", "{e}");306 }307}308309pub struct NoopHandler;310impl Handler for NoopHandler {311 fn handle_line(&mut self, _e: &str) {}312}313314#[derive(Default)]315pub struct NixHandler {316 spans: HashMap<u64, Span>,317}318fn process_message(m: &str) -> String {319 static OSC_CLEANER: Lazy<Regex> =320 Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());321 static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());322 let m = OSC_CLEANER.replace_all(m, "");323 324 325 DETABBER.replace_all(m.as_ref(), " ").to_string()326}327impl Handler for NixHandler {328 fn handle_line(&mut self, e: &str) {329 if let Some(e) = e.strip_prefix("@nix ") {330 let log: NixLog = match serde_json::from_str(e) {331 Ok(l) => l,332 Err(err) => {333 warn!("failed to parse nix log line {:?}: {}", e, err);334 return;335 }336 };337 match log {338 NixLog::Msg { msg, raw_msg, .. } => {339 #[allow(clippy::nonminimal_bool)]340 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))341 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")342 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {343 if let Some(raw_msg) = raw_msg {344 if !msg.is_empty() {345 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())346 } else {347 info!(target: "nix", "{}", raw_msg.trim_end())348 }349 } else {350 info!(target: "nix", "{}", msg.trim_end())351 }352 }353 }354 NixLog::Start {355 ref fields,356 typ,357 id,358 ..359 } if typ == 105 && !fields.is_empty() => {360 if let [LogField::String(drv), ..] = &fields[..] {361 let mut drv = drv.as_str();362 if let Some(pkg) = drv.strip_prefix("/nix/store/") {363 let mut it = pkg.splitn(2, '-');364 it.next();365 if let Some(pkg) = it.next() {366 drv = pkg;367 }368 }369 info!(target: "nix","building {}", drv);370 let span = info_span!("build", drv);371 span.pb_start();372 self.spans.insert(id, span);373 } else {374 warn!("bad build log: {:?}", log)375 }376 }377 NixLog::Start {378 ref fields,379 typ,380 id,381 ..382 } if typ == 100 && fields.len() >= 3 => {383 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =384 &fields[..]385 {386 let mut drv = drv.as_str();387388 if let Some(pkg) = drv.strip_prefix("/nix/store/") {389 let mut it = pkg.splitn(2, '-');390 it.next();391 if let Some(pkg) = it.next() {392 drv = pkg;393 }394 }395 396 let span = info_span!("copy", from, to, drv);397 span.pb_start();398 self.spans.insert(id, span);399 } else {400 warn!("bad copy log: {:?}", log)401 }402 }403 NixLog::Start { text, typ, id, .. }404 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>405 {406 if !text.is_empty()407 && text != "querying info about missing paths"408 && text != "copying 0 paths"409 410 && !(text.starts_with("copying '") && text.ends_with("' to the store"))411 {412 let span = info_span!("job");413 span.pb_start();414 span.pb_set_message(&process_message(text.trim()));415 self.spans.insert(id, span);416 info!(target: "nix", "{}", text);417 }418 }419 NixLog::Start {420 text,421 level: 0,422 typ: 108,423 ..424 } if text.is_empty() => {425 426 }427 NixLog::Start {428 text,429 level: 4,430 typ: 109,431 ..432 } if text.starts_with("querying info about ") => {433 434 }435 NixLog::Start {436 text,437 level: 4,438 typ: 101,439 ..440 } if text.starts_with("downloading ") => {441 442 }443 NixLog::Start {444 text,445 level: 1,446 typ: 111,447 ..448 } if text.starts_with("waiting for a machine to build ") => {449 450 }451 NixLog::Start {452 text,453 level: 3,454 typ: 111,455 ..456 } if text.starts_with("resolved derivation: ") => {457 458 }459 NixLog::Start {460 text,461 level: 1,462 typ: 111,463 id,464 ..465 } if text.starts_with("waiting for lock on ") => {466 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();467 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {468 drv = txt;469 }470 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {471 drv = txt;472 }473 if let Some(txt) = drv.split("', '").next() {474 drv = txt;475 }476 if let Some(pkg) = drv.strip_prefix("/nix/store/") {477 let mut it = pkg.splitn(2, '-');478 it.next();479 if let Some(pkg) = it.next() {480 drv = pkg;481 }482 }483 let span = info_span!("waiting on drv", drv);484 span.pb_start();485 self.spans.insert(id, span);486 487 }488 NixLog::Stop { id, .. } => {489 self.spans.remove(&id);490 }491 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {492 if let Some(span) = self.spans.get(&id) {493 if let LogField::String(s) = &fields[0] {494 span.pb_set_message(&process_message(s.trim()));495 } else {496 warn!("bad fields: {fields:?}");497 }498 } else {499 warn!("unknown result id: {id} {typ} {fields:?}");500 }501 502 }503 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {504 if let Some(span) = self.spans.get(&id) {505 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =506 &fields[..4]507 {508 span.pb_set_length(*expected);509 span.pb_set_position(*done);510 } else {511 warn!("bad fields: {fields:?}");512 }513 } else {514 515 516 }517 518 }519 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {520 521 }522 _ => warn!("unknown log: {:?}", log),523 };524 } else {525 let e = e.trim();526 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {527 return;528 }529 info!("{e}")530 }531 }532}533534async fn run_nix_inner_raw(535 str: String,536 mut cmd: Command,537 want_stdout: bool,538 err_handler: &mut dyn Handler,539 mut out_handler: Option<&mut dyn Handler>,540) -> Result<Option<Vec<u8>>> {541 cmd.stderr(Stdio::piped());542 cmd.stdout(Stdio::piped());543 let mut child = cmd.spawn()?;544 let mut stderr = child.stderr.take().unwrap();545 let stdout = child.stdout.take().unwrap();546 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());547 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));548 let mut ob = want_stdout549 .then(|| out.take().unwrap())550 .unwrap_or_else(|| Box::new(EmptyAsyncRead));551 let mut ol = (!want_stdout)552 .then(|| out.take().unwrap())553 .unwrap_or_else(|| Box::new(EmptyAsyncRead));554 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());555 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());556557 558559 let mut out_buf = if want_stdout { Some(vec![]) } else { None };560 loop {561 select! {562 e = err.next() => {563 if let Some(e) = e {564 let e = e?;565 err_handler.handle_line(&e);566 }567 },568 o = ob.next() => {569 if let Some(o) = o {570 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);571 }572 },573 o = ol.next() => {574 if let Some(o) = o {575 let o = o?;576 if let Some(out) = out_handler.as_mut() {577 out.handle_line(&o)578 } else {579 err_handler.handle_line(&o)580 }581 582 }583 },584 code = child.wait() => {585 let code = code?;586 if !code.success() {587 anyhow::bail!("command '{str}' failed with status {}", code);588 }589 break;590 }591 }592 }593594 Ok(out_buf)595}596async fn run_nix_inner_raw_ssh(597 str: String,598 mut cmd: OwningCommand<Arc<Session>>,599 want_stdout: bool,600 err_handler: &mut dyn Handler,601 mut out_handler: Option<&mut dyn Handler>,602) -> Result<Option<Vec<u8>>> {603 cmd.stderr(openssh::Stdio::piped());604 cmd.stdout(openssh::Stdio::piped());605 let mut child = cmd.spawn().await?;606 let mut stderr = child.stderr().take().unwrap();607 let stdout = child.stdout().take().unwrap();608 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());609 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));610 let mut ob = want_stdout611 .then(|| out.take().unwrap())612 .unwrap_or_else(|| Box::new(EmptyAsyncRead));613 let mut ol = (!want_stdout)614 .then(|| out.take().unwrap())615 .unwrap_or_else(|| Box::new(EmptyAsyncRead));616 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());617 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());618619 620621 let mut out_buf = if want_stdout { Some(vec![]) } else { None };622623 let mut wait_future = pin::pin!(child.wait());624 loop {625 select! {626 e = err.next() => {627 if let Some(e) = e {628 let e = e?;629 err_handler.handle_line(&e);630 }631 },632 o = ob.next() => {633 if let Some(o) = o {634 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);635 }636 },637 o = ol.next() => {638 if let Some(o) = o {639 let o = o?;640 if let Some(out) = out_handler.as_mut() {641 out.handle_line(&o)642 } else {643 err_handler.handle_line(&o)644 }645 646 }647 },648 code = &mut wait_future => {649 let code = code?;650 if !code.success() {651 anyhow::bail!("command '{str}' failed with status {}", code);652 }653 break;654 }655 }656 }657658 Ok(out_buf)659}660661pub trait ErrorRecorder: Send {662 663 fn push_message(&mut self, msg: &str) -> bool;664}665666#[derive(Debug)]667enum LogField {668 String(String),669 Num(u64),670}671672impl<'de> Deserialize<'de> for LogField {673 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>674 where675 D: serde::Deserializer<'de>,676 {677 struct StringOrNum;678 impl<'de> Visitor<'de> for StringOrNum {679 type Value = LogField;680681 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {682 write!(f, "string or unsigned")683 }684685 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>686 where687 E: serde::de::Error,688 {689 Ok(LogField::String(v.to_owned()))690 }691692 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>693 where694 E: serde::de::Error,695 {696 Ok(LogField::Num(v))697 }698 }699700 deserializer.deserialize_any(StringOrNum)701 }702}703704#[derive(Deserialize, Debug)]705#[serde(rename_all = "camelCase", tag = "action")]706#[allow(dead_code)]707enum NixLog {708 Msg {709 level: u32,710 msg: String,711 raw_msg: Option<String>,712 },713 Start {714 id: u64,715 level: u32,716 #[serde(default)]717 fields: Vec<LogField>,718 text: String,719 #[serde(rename = "type")]720 typ: u32,721 },722 Stop {723 id: u64,724 },725 Result {726 id: u64,727 #[serde(rename = "type")]728 typ: u32,729 #[serde(default)]730 fields: Vec<LogField>,731 },732}