difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
"anyhow",
"async-trait",
"base64 0.21.5",
+ "better-command",
"chrono",
"clap",
"futures",
@@ -1510,9 +1523,9 @@
[[package]]
name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
@@ -1922,6 +1935,10 @@
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
name = "rnix"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
[[package]]
name = "serde"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
@@ -2123,9 +2140,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -2134,9 +2151,9 @@
[[package]]
name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [
"itoa",
"ryu",
@@ -2527,11 +2544,10 @@
[[package]]
name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
- "cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2539,9 +2555,9 @@
[[package]]
name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -2550,9 +2566,9 @@
[[package]]
name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -2560,9 +2576,9 @@
[[package]]
name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
dependencies = [
"indicatif",
"tracing",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
[workspace]
members = ["crates/*", "cmds/*"]
resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
edition = "2021"
[dependencies]
+nixlike.workspace = true
+better-command.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -15,7 +17,6 @@
hostname = "0.3.1"
age-core = "0.9.0"
peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
age = { version = "0.9.2", features = ["ssh", "armor"] }
base64 = "0.21.5"
chrono = { version = "0.4.31", features = ["serde"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth1//! Wrapper around nix repl, which allows to work on nix code, without relying on2//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.34use std::collections::HashMap;5use std::ffi::{OsStr, OsString};6use std::fmt::{self, Display};7use std::path::PathBuf;8use std::process::Stdio;9use std::sync::{Arc, OnceLock};1011use anyhow::{anyhow, bail, ensure, Context, Result};12use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};13use futures::StreamExt;14use itertools::Itertools;15use r2d2::{Pool, PooledConnection};16use serde::de::DeserializeOwned;17use serde::{Deserialize, Serialize};18use tokio::io::AsyncWriteExt;19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};20use tokio::select;21use tokio::sync::{mpsc, oneshot, Mutex};22use tokio_util::codec::{FramedRead, LinesCodec};23use tracing::{debug, error, warn, Level};2425const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2627pub struct NixSessionInner {28 full_delimiter: String,29 nix_handler: ClonableHandler<NixHandler>,30 out: OutputHandler,31 stdin: ChildStdin,32 string_wrapping: (String, String),33 number_wrapping: (String, String),3435 executing_command: Arc<Mutex<()>>,3637 next_id: u32,38 free_list: Vec<u32>,39}40const TRAIN_STRING: &str = "\"TRAIN_STRING\"";41const TRAIN_NUMBER: &str = "13141516";4243#[must_use]44struct ErrorCollector<'i, H> {45 collected: Vec<String>,46 inner: &'i mut H,47}48impl<'i, H> ErrorCollector<'i, H> {49 fn new(inner: &'i mut H) -> Self {50 Self {51 collected: vec![],52 inner,53 }54 }55}56impl<H> ErrorCollector<'_, H> {57 fn handle_line_inner(&mut self, msg: &str) -> bool {58 let Some(msg) = msg.strip_prefix("@nix ") else {59 return false;60 };61 #[derive(Deserialize)]62 struct ErrorAction {63 action: String,64 level: u32,65 msg: String,66 }67 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {68 return false;69 };70 if act.action != "msg" || act.level != 0 {71 return false;72 }73 self.collected.push(act.msg);74 true75 }76 fn finish(self) -> Result<()> {77 // fn dedent(s: String) -> String {78 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)79 // }80 if !self.collected.is_empty() {81 bail!(82 "{}",83 self.collected84 .iter()85 .map(|v| {86 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {87 let v = unindent::unindent(f.trim_start());88 v.trim().to_owned()89 } else {90 v.to_owned()91 }92 })93 .join("\n")94 );95 }96 Ok(())97 }98 fn flush(self) {99 for line in self.collected {100 warn!("{line}");101 }102 }103}104impl<H: Handler> Handler for ErrorCollector<'_, H> {105 fn handle_line(&mut self, e: &str) {106 if self.handle_line_inner(e) {107 return;108 }109 self.inner.handle_line(e)110 }111}112113enum OutputLine {114 Out(String),115 Err(String),116}117struct OutputHandler {118 rx: mpsc::Receiver<OutputLine>,119 _cancel_handle: oneshot::Receiver<()>,120}121impl OutputHandler {122 fn new(out: ChildStdout, err: ChildStderr) -> Self {123 let mut out = FramedRead::new(out, LinesCodec::new());124 let mut err = FramedRead::new(err, LinesCodec::new());125 let (tx, rx) = mpsc::channel(20);126 let (mut cancelled, _cancel_handle) = oneshot::channel();127 tokio::spawn(async move {128 loop {129 select! {130 // We should receive errors earlier than synchronization131 biased;132 e = err.next() => {133 let Some(Ok(e)) = e else {134 if e.is_some() {135 error!("bad repl stderr: {e:?}");136 }137 continue;138 };139 let _ = tx.send(OutputLine::Err(e)).await;140 }141 o = out.next() => {142 let Some(Ok(o)) = o else {143 if o.is_some() {144 error!("bad repl stdout: {o:?}");145 }146 continue;147 };148 let _ = tx.send(OutputLine::Out(o)).await;149 }150 // Reader doesn't care about stdout, as this is cancelled.151 // Error still might be useful, to process leftover span closures?152 _ = cancelled.closed() => {153 break;154 }155 }156 }157 });158 Self { rx, _cancel_handle }159 }160 async fn next(&mut self) -> Option<OutputLine> {161 self.rx.recv().await162 }163}164165struct WarnHandler;166impl Handler for WarnHandler {167 fn handle_line(&mut self, e: &str) {168 warn!(target: "nix", "{e}")169 }170}171172impl NixSessionInner {173 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {174 let mut cmd = Command::new("nix");175 cmd.arg("repl")176 .arg(flake)177 .arg("--log-format")178 .arg("internal-json");179 for arg in extra_args {180 cmd.arg(arg);181 }182 cmd.stdin(Stdio::piped());183 cmd.stdout(Stdio::piped());184 cmd.stderr(Stdio::piped());185 let cmd = cmd.spawn()?;186 let stdout = cmd.stdout.unwrap();187 let stderr = cmd.stderr.unwrap();188 let mut out = OutputHandler::new(stdout, stderr);189 let mut stdin = cmd.stdin.unwrap();190 // Standard repl hello doesn't work with internal-json logger191 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;192 stdin.write_all(b"\n").await?;193 stdin.flush().await?;194 let nix_handler = NixHandler::default();195 let mut full_delimiter = None;196 let mut errors = vec![];197 while let Some(line) = out.next().await {198 let line = match line {199 OutputLine::Out(o) => o,200 OutputLine::Err(_e) => {201 // Handle startup errors, but skip repl hello?202 errors.push(_e);203 continue;204 }205 };206 if line.contains(REPL_DELIMITER) {207 debug!("discovered repl delimiter with added colors: {line}");208 full_delimiter = Some(line.to_owned());209 break;210 }211 }212 let Some(full_delimiter) = full_delimiter else {213 for e in errors {214 error!("{e}");215 }216 bail!("failed to discover delimiter");217 };218 let mut res = Self {219 full_delimiter,220 nix_handler: ClonableHandler::new(nix_handler),221 out,222 stdin,223 string_wrapping: Default::default(),224 number_wrapping: Default::default(),225226 executing_command: Arc::new(Mutex::new(())),227228 next_id: 0,229 free_list: vec![],230 };231 res.train().await?;232 Ok(res)233 }234 async fn train(&mut self) -> Result<()> {235 {236 let full_string = self237 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)238 .await?;239 let string_offset = full_string.find(TRAIN_STRING).expect("contained");240 let string_prefix = &full_string[..string_offset];241 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];242 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());243 }244 {245 let full_number = self246 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)247 .await?;248 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");249 let number_prefix = &full_number[..number_offset];250 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];251 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());252 }253 Ok(())254 }255 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {256 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {257 let cmd_str = String::from_utf8_lossy(cmd.as_ref());258 tracing::debug!("{cmd_str}");259 };260 self.stdin.write_all(cmd.as_ref()).await?;261 self.stdin.write_all(b"\n").await?;262 Ok(())263 }264 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {265 let mut out = String::new();266 while let Some(line) = self.out.next().await {267 let line = match line {268 OutputLine::Out(out) => out,269 OutputLine::Err(err) => {270 err_handler.handle_line(&err);271 continue;272 }273 };274 if line == self.full_delimiter {275 return Ok(out);276 }277 if !out.is_empty() {278 out.push('\n');279 }280 out.push_str(&line);281 }282 bail!("didn't reached delimiter");283 }284 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {285 let num = self.number_wrapping.clone();286 let n = self.execute_expression_wrapping(expr, &num).await?;287 Ok(n.parse::<u64>()?)288 }289 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {290 let num = self.string_wrapping.clone();291 let n = self.execute_expression_wrapping(expr, &num).await?;292 let str: String = serde_json::from_str(&n)?;293 Ok(str)294 }295 async fn execute_expression_to_json<V: DeserializeOwned>(296 &mut self,297 expr: impl AsRef<[u8]>,298 ) -> Result<V> {299 let mut fexpr = b"builtins.toJSON (".to_vec();300 fexpr.extend_from_slice(expr.as_ref());301 fexpr.push(b')');302 let v = self.execute_expression_string(fexpr).await?;303 Ok(serde_json::from_str(&v)?)304 }305 async fn execute_expression_wrapping(306 &mut self,307 expr: impl AsRef<[u8]>,308 wrapping: &(String, String),309 ) -> Result<String> {310 let mut nix_handler = self.nix_handler.clone();311 let mut collected = ErrorCollector::new(&mut nix_handler);312 let res = self.execute_expression_raw(expr, &mut collected).await?;313 if res.is_empty() {314 collected.finish()?;315 bail!("expected expression, got nothing")316 } else {317 collected.flush()318 };319 let Some(res) = res.strip_prefix(&wrapping.0) else {320 bail!("invalid type")321 };322 let Some(res) = res.strip_suffix(&wrapping.1) else {323 bail!("invalid type")324 };325 Ok(res.to_owned())326 }327 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {328 let mut nix_handler = self.nix_handler.clone();329 let mut collected = ErrorCollector::new(&mut nix_handler);330 let v = self.execute_expression_raw(expr, &mut collected).await?;331 collected.finish()?;332 ensure!(v.is_empty(), "unexpected expression result");333 Ok(())334 }335 async fn execute_expression_raw(336 &mut self,337 expr: impl AsRef<[u8]>,338 err_handler: &mut dyn Handler,339 ) -> Result<String> {340 // Prevent two commands from being executed in parallel, messing with each other.341 let _lock = self.executing_command.clone();342 let _guard = _lock.lock().await;343344 self.send_command(expr).await?;345 // It will be echoed346 self.send_command(REPL_DELIMITER).await?;347 self.read_until_delimiter(err_handler).await348 }349 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {350 let id = self.allocate_id();351 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))352 .await?;353 Ok(id)354 }355356 /// Id should be immediately used357 fn allocate_id(&mut self) -> u32 {358 if let Some(free) = self.free_list.pop() {359 free360 } else {361 let v = self.next_id;362 self.next_id += 1;363 v364 }365 }366 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.367 // async fn free_id(&mut self, id: u32) -> Result<()> {368 // self.execute_expression_empty(format!("sess_field_{id} = null"))369 // .await?;370 // self.free_list.push(id);371 // Ok(())372 // }373}374375#[derive(Clone)]376pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);377378#[derive(Clone)]379pub struct NixExprBuilder {380 out: String,381 used_fields: Vec<Field>,382}383impl NixExprBuilder {384 pub fn object() -> Self {385 NixExprBuilder {386 out: "{ ".to_owned(),387 used_fields: Vec::new(),388 }389 }390 pub fn string(s: &str) -> Self {391 NixExprBuilder {392 out: nixlike::serialize(s)393 .expect("no problems with serializing_string")394 .trim_end()395 .to_owned(),396 used_fields: Vec::new(),397 }398 }399 pub fn serialized(v: impl Serialize) -> Self {400 let serialized = nixlike::serialize(v).expect("invalid value for apply");401 Self {402 out: serialized.trim_end().to_owned(),403 used_fields: Vec::new(),404 }405 }406 pub fn field(f: Field) -> Self {407 Self {408 out: format!("sess_field_{}", f.0.value.expect("no value")),409 used_fields: vec![f],410 }411 }412 pub fn end_obj(&mut self) {413 self.out.push('}');414 }415 pub fn obj_key(&mut self, name: Self, value: Self) {416 self.out.push_str(r#""${"#);417 self.extend(name);418 self.out.push_str(r#"}" = "#);419 self.extend(value);420 self.out.push_str("; ");421 }422423 pub fn extend(&mut self, e: Self) {424 self.out.push_str(&e.out);425 self.used_fields.extend(e.used_fields);426 }427428 pub fn session(&self) -> NixSession {429 let mut session = None;430 for ele in &self.used_fields {431 if session.is_none() {432 session = Some(ele.0.session.clone());433 continue;434 }435 let session = &session.as_ref().expect("checked").0;436 let ele_sess = &ele.0.session.0;437 assert!(438 Arc::ptr_eq(session, ele_sess),439 "can't mix fields from different session"440 );441 }442 session.expect("expr without fields used")443 }444 pub fn index_attr(&mut self, s: &str) {445 let escaped = nixlike::serialize(s).expect("string");446 self.out.push('.');447 self.out.push_str(escaped.trim_end());448 }449}450451#[macro_export]452macro_rules! nix_expr_inner {453 (Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{454 use $crate::better_nix_eval::NixExprBuilder;455 let mut out = NixExprBuilder::object();456 $(457 out.obj_key(458 NixExprBuilder::string(stringify!($ident)),459 $crate::nix_expr_inner!($($val)+),460 );461 )*462 out.end_obj();463 out464 }};465 (@field($o:ident) . $var:ident $($tt:tt)*) => {{466 $o.index_attr(stringify!($var));467 nix_expr_inner!(@field($o) $($tt)*);468 }};469 (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{470 $o.push(Index::attr(&$v));471 nix_expr_inner!(@o($o) $($tt)*);472 }};473 (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{474 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));475 nix_expr_inner!(@o($o) $($tt)*);476 }};477 (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {478 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));479 nix_expr_inner!(@o($o) $($tt)*);480 };481 (@field($o:ident)) => {};482 ($field:ident $($tt:tt)*) => {{483 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};484 #[allow(unused_mut, reason = "might be used if indexed")]485 let mut out = NixExprBuilder::field($field.clone());486 nix_expr_inner!(@field(out) $($tt)*);487 out488 }};489 ($v:literal) => {{490 use $crate::better_nix_eval::NixExprBuilder;491 NixExprBuilder::string($v)492 }};493 ({$v:expr}) => {{494 use $crate::better_nix_eval::NixExprBuilder;495 NixExprBuilder::serialized(&$v)496 }}497}498#[macro_export]499macro_rules! nix_expr {500 ($($tt:tt)+) => {{501 use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};502 let expr = nix_expr_inner!($($tt)+);503 Field::new(expr.session(), expr.out)504 }};505}506507#[macro_export]508macro_rules! nix_go {509 (@o($o:ident) . $var:ident $($tt:tt)*) => {{510 $o.push(Index::attr(stringify!($var)));511 nix_go!(@o($o) $($tt)*);512 }};513 (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{514 $o.push(Index::attr(&$v));515 nix_go!(@o($o) $($tt)*);516 }};517 (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{518 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));519 nix_go!(@o($o) $($tt)*);520 }};521 (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {522 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));523 nix_go!(@o($o) $($tt)*);524 };525 (@o($o:ident)) => {};526 ($field:ident $($tt:tt)+) => {{527 use $crate::{nix_go, better_nix_eval::Index};528 let field = $field.clone();529 let mut out = vec![];530 nix_go!(@o(out) $($tt)*);531 field.select(out).await?532 }}533}534#[macro_export]535macro_rules! nix_go_json {536 ($($tt:tt)*) => {{537 $crate::nix_go!($($tt)*).as_json().await?538 }};539}540541#[derive(Clone)]542pub enum Index {543 Var(String),544 String(String),545 Apply(String),546 Expr(NixExprBuilder),547 ExprApply(NixExprBuilder),548}549impl Index {550 pub fn var(v: impl AsRef<str>) -> Self {551 let v = v.as_ref();552 assert!(553 !(v.contains('.') | v.contains(' ')),554 "bad variable name: {v}"555 );556 Self::Var(v.to_owned())557 }558 pub fn attr(v: impl AsRef<str>) -> Self {559 Self::String(v.as_ref().to_owned())560 }561 pub fn apply(v: impl Serialize) -> Self {562 let serialized = nixlike::serialize(v).expect("invalid value for apply");563 Self::Apply(serialized.trim_end().to_owned())564 }565}566impl Display for Index {567 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {568 match self {569 Index::Var(v) => {570 write!(f, "{v}")571 }572 Index::String(k) => {573 let v = nixlike::format_identifier(k.as_str());574 write!(f, ".{v}")575 }576 Index::Apply(o) => {577 write!(f, "<apply>({o})")578 }579 Index::Expr(e) => {580 write!(f, "[{}]", e.out)581 }582 Index::ExprApply(e) => {583 write!(f, "<apply>({})", e.out)584 }585 }586 }587}588impl fmt::Debug for Index {589 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {590 write!(f, "{self}")591 }592}593struct PathDisplay<'i>(&'i [Index]);594impl Display for PathDisplay<'_> {595 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {596 for i in self.0 {597 write!(f, "{i}")?;598 }599 Ok(())600 }601}602struct FieldInner {603 full_path: Option<Vec<Index>>,604 session: NixSession,605 value: Option<u32>,606}607fn context(full_path: Option<&[Index]>, query: &str) -> String {608 if let Some(full_path) = &full_path {609 format!("full path: {}", PathDisplay(full_path))610 } else {611 format!("query: {query:?}")612 }613}614#[derive(Clone)]615pub struct Field(Arc<FieldInner>);616impl Field {617 fn root(session: NixSession) -> Self {618 Self(Arc::new(FieldInner {619 full_path: Some(vec![]),620 session,621 value: None,622 }))623 }624 async fn new(session: NixSession, query: &str) -> Result<Self> {625 let vid = session626 .0627 .lock()628 .await629 .execute_assign(query)630 .await631 .with_context(|| context(None, query))?;632 Ok(Self(Arc::new(FieldInner {633 full_path: None,634 session,635 value: Some(vid),636 })))637 }638 pub async fn field(session: NixSession, field: &str) -> Result<Self> {639 Self::root(session).select([Index::var(field)]).await640 }641 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {642 let mut used_fields = Vec::new();643 let mut name = name.into_iter();644645 let mut full_path = self.0.full_path.clone();646 let mut query = if let Some(id) = self.0.value {647 format!("sess_field_{id}")648 } else {649 let first = name.next();650 if let Some(Index::Var(i)) = first {651 if let Some(full_path) = &mut full_path {652 full_path.push(Index::Var(i.clone()));653 }654 i.clone()655 } else {656 panic!("first path item should be variable, got {first:?}")657 }658 };659 for v in name {660 if let Some(full_path) = &mut full_path {661 full_path.push(v.clone());662 }663 match v {664 Index::Var(_) => panic!("var item may only be first"),665 Index::String(s) => {666 let escaped = nixlike::serialize(s)?;667 query.push('.');668 query.push_str(escaped.trim());669 }670 Index::Apply(a) => {671 // In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`672 query = format!("({query} {a})");673 }674 Index::Expr(e) => {675 let index = Field::new(self.0.session.clone(), &e.out).await?;676 used_fields.push(index.clone());677 query.push('.');678 let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));679 query.push_str(&index);680 }681 Index::ExprApply(e) => {682 let index = Field::new(self.0.session.clone(), &e.out).await?;683 used_fields.push(index.clone());684 query.push(' ');685 let index = format!("sess_field_{}", index.0.value.expect("value"));686 query.push_str(&index);687 query = format!("({query})");688 }689 }690 }691692 let vid = self693 .0694 .session695 .0696 .lock()697 .await698 .execute_assign(&query)699 .await700 .with_context(|| {701 if let Some(full_path) = &full_path {702 format!("full path: {}", PathDisplay(full_path))703 } else {704 format!("query: {query:?}")705 }706 })?;707 Ok(Self(Arc::new(FieldInner {708 full_path,709 session: self.0.session.clone(),710 value: Some(vid),711 })))712 }713 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {714 let id = self.0.value.expect("can't serialize root field");715 let query = format!("sess_field_{id}");716 self.0717 .session718 .0719 .lock()720 .await721 .execute_expression_to_json(&query)722 .await723 .with_context(|| context(self.0.full_path.as_deref(), &query))724 }725 pub async fn has_field(&self, name: &str) -> Result<bool> {726 let id = self.0.value.expect("can't list root fields");727 let key = nixlike::escape_string(name);728 let query = format!("sess_field_{id} ? {key}");729 self.0730 .session731 .0732 .lock()733 .await734 .execute_expression_to_json(&query)735 .await736 .with_context(|| context(self.0.full_path.as_deref(), &query))737 }738 pub async fn list_fields(&self) -> Result<Vec<String>> {739 let id = self.0.value.expect("can't list root fields");740 let query = format!("builtins.attrNames sess_field_{id}");741 self.0742 .session743 .0744 .lock()745 .await746 .execute_expression_to_json(&query)747 .await748 .with_context(|| context(self.0.full_path.as_deref(), &query))749 }750 pub async fn type_of(&self) -> Result<String> {751 let id = self.0.value.expect("can't list root fields");752 let query = format!("builtins.typeOf sess_field_{id}");753 self.0754 .session755 .0756 .lock()757 .await758 .execute_expression_to_json(&query)759 .await760 .with_context(|| context(self.0.full_path.as_deref(), &query))761 }762 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {763 let id = self.0.value.expect("can't use build on not-value");764 let query = format!(":b sess_field_{id}");765 let vid = self766 .0767 .session768 .0769 .lock()770 .await771 .execute_expression_raw(&query, &mut NixHandler::default())772 .await?;773 ensure!(774 !vid.is_empty(),775 "build failed: {}",776 context(self.0.full_path.as_deref(), &query),777 );778 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")779 else {780 panic!("unexpected build output: {vid:?}");781 };782 let outputs = vid783 .split('\n')784 .filter(|v| !v.is_empty())785 .map(|v| v.split_once(" -> ").expect("unexpected build output"))786 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))787 .collect();788 Ok(outputs)789 }790}791impl Drop for FieldInner {792 fn drop(&mut self) {793 if let Some(id) = self.value {794 if let Ok(mut lock) = self.session.0.try_lock() {795 lock.free_list.push(id)796 }797 // Leaked798 }799 }800}801struct NixSessionPoolInner {802 flake: OsString,803 nix_args: Vec<OsString>,804}805806#[derive(Debug)]807pub struct NixPoolError(anyhow::Error);808impl From<anyhow::Error> for NixPoolError {809 fn from(value: anyhow::Error) -> Self {810 Self(value)811 }812}813impl std::error::Error for NixPoolError {}814impl std::fmt::Display for NixPoolError {815 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {816 self.0.fmt(f)817 }818}819impl r2d2::ManageConnection for NixSessionPoolInner {820 type Connection = NixSessionInner;821 type Error = NixPoolError;822 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {823 let _v = TOKIO_RUNTIME824 .get()825 .expect("missed tokio runtime init!")826 .enter();827 Ok(futures::executor::block_on(NixSessionInner::new(828 self.flake.as_os_str(),829 self.nix_args.iter().map(OsString::as_os_str),830 ))?)831 }832833 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {834 let _v = TOKIO_RUNTIME835 .get()836 .expect("missed tokio runtime init!")837 .enter();838 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;839 if res != 4 {840 return Err(anyhow!("sanity check failed").into());841 };842 Ok(())843 }844845 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {846 false847 }848}849pub struct NixSessionPool(Pool<NixSessionPoolInner>);850impl NixSessionPool {851 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {852 let inner = tokio::task::block_in_place(|| {853 r2d2::Builder::<NixSessionPoolInner>::new()854 .min_idle(Some(0))855 .build(NixSessionPoolInner { flake, nix_args })856 })?;857 Ok(Self(inner))858 }859 pub async fn get(&self) -> Result<NixSession> {860 let v = tokio::task::block_in_place(|| self.0.get())?;861 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))862 }863}864865pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
use crate::nix_go;
use anyhow::{anyhow, Result};
use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -112,12 +112,12 @@
current: bool,
datetime: String,
}
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
- let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.arg("--list-generations");
// Sudo is required due to --list-generations acquiring lock on the profile.
- let data = config.run_string_on(host, cmd, true).await?;
+ let data = cmd.sudo().run_string().await?;
let generations = data
.split('\n')
.map(|e| e.trim())
@@ -161,25 +161,12 @@
.map_err(|_e| anyhow!("bad list-generations output"))?
.ok_or_else(|| anyhow!("failed to find generation"))?;
Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("stop").arg(unit);
- config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("start").arg(unit);
- config.run_on(host, cmd, true).await
}
async fn execute_upload(
build: &BuildSystems,
- config: &Config,
action: UploadAction,
- host: &str,
+ host: &ConfigHost,
built: PathBuf,
) -> Result<()> {
let mut failed = false;
@@ -191,15 +178,15 @@
if !build.disable_rollback {
let _span = info_span!("preparing").entered();
info!("preparing for rollback");
- let generation = get_current_generation(config, host).await?;
+ let generation = get_current_generation(&host).await?;
info!(
"rollback target would be {} {}",
generation.id, generation.datetime
);
{
- let mut cmd = MyCommand::new("sh");
+ let mut cmd = host.cmd("sh").await?;
cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to set rollback marker: {e}");
failed = true;
}
@@ -215,37 +202,41 @@
// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
// Anyway, reboot will still help in this case.
if action.should_schedule_rollback_run() {
- let mut cmd = MyCommand::new("systemd-run");
+ let mut cmd = host.cmd("systemd-run").await?;
cmd.comparg("--on-active", "3min")
.comparg("--unit", "rollback-watchdog-run")
.arg("systemctl")
.arg("start")
.arg("rollback-watchdog.service");
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to schedule rollback run: {e}");
failed = true;
}
}
}
+
if action.should_switch_profile() && !failed {
info!("switching generation");
- let mut cmd = MyCommand::new("nix-env");
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to switch generation: {e}");
failed = true;
}
}
+
+ // FIXME: Connection might be disconnected after activation run
+
if action.should_activate() && !failed {
let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
- let mut cmd = MyCommand::new(switch_script);
+ let mut cmd = host.cmd(switch_script).await?;
cmd.arg(action.name());
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = cmd.sudo().run().in_current_span().await {
error!("failed to activate: {e}");
failed = true;
}
@@ -253,7 +244,8 @@
if !build.disable_rollback {
if failed {
info!("executing rollback");
- if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+ if let Err(e) = host
+ .systemctl_start("rollback-watchdog.service")
.instrument(info_span!("rollback"))
.await
{
@@ -261,27 +253,29 @@
}
} else {
info!("trying to mark upgrade as successful");
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
}
}
info!("disarming watchdog, just in case");
- if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+ if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
// It is ok, if there was no reboot - then timer might not be running.
}
if action.should_schedule_rollback_run() {
- if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+ if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
error!("failed to disarm rollback run: {e}");
}
}
- } else {
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
- // Marker might not exist, yet better try to remove it.
- }
+ } else if let Err(_e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
+ // Marker might not exist, yet better try to remove it.
}
Ok(())
}
@@ -289,12 +283,13 @@
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
+ let host = config.host(&host).await?;
let action = Action::from(self.subcommand.clone());
let fleet_field = &config.fleet_field;
let drv = nix_go!(
fleet_field.buildSystems(Obj {
localSystem: { config.local_system.clone() }
- })[{ action.build_attr() }][{ host }]
+ })[{ action.build_attr() }][{ &host.name }]
);
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
match action {
Action::Upload { action } => {
- if !config.is_local(&host) {
+ if !config.is_local(&host.name) {
info!("uploading system closure");
{
+ // TODO: Move to remote_derivation method.
// Alternatively, nix store make-content-addressed can be used,
// at least for the first deployment, to provide trusted store key.
//
@@ -329,13 +325,11 @@
}
let mut tries = 0;
loop {
- let mut nix = MyCommand::new("nix");
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{host}"))
- .arg(out_output);
- match nix.run_nix().await {
- Ok(()) => break,
+ match host.remote_derivation(out_output).await {
+ Ok(remote) => {
+ assert!(&remote == out_output, "CA derivations aren't implemented");
+ break;
+ }
Err(e) if tries < 3 => {
tries += 1;
warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, out_output.clone()).await?
+ execute_upload(&self, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
- out.push(format!("sd-image-{}", host));
+ out.push(format!("sd-image-{}", host.name));
info!("linking sd image to {:?}", out);
symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
- out.push(format!("installation-cd-{}", host));
+ out.push(format!("installation-cd-{}", host.name));
info!("linking iso image to {:?}", out);
symlink(out_output, out)?;
@@ -379,6 +373,17 @@
let this = this.clone();
let span = info_span!("deployment", host = field::display(&host.name));
let hostname = host.name;
+ // FIXME: Since the introduction of better-nix-eval,
+ // due to single repl used for builds, hosts are waiting for each other to build,
+ // instead of building concurrently.
+ //
+ // Open multiple repls?
+ //
+ // Create build batcher, which will behave similar to golangs
+ // WaitGroup, and start executing once all the build tasks are scheduled?
+ // This also allows to cleanup build output, as there will be no longer
+ // "waiting for remote machine" messages in the cases when one package is needed for
+ // multiple hosts.
set.spawn_local(
(async move {
match this.build_task(config, hostname).await {
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
@@ -404,9 +404,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(data) = secret.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, data, target_recipients)
- .await?;
+ let host = config.host(&identity_holder).await?;
+ let encrypted = host.reencrypt(data, target_recipients).await?;
secret.secret.secret = Some(encrypted);
}
@@ -481,9 +480,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(secret) = data.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret, target_recipients)
- .await?;
+ let host = config.host(identity_holder).await?;
+ let encrypted = host.reencrypt(secret, target_recipients).await?;
data.secret.secret = Some(encrypted);
}
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
- collections::HashMap,
- ffi::OsStr,
- pin,
- process::Stdio,
- sync::{Arc, Mutex},
- task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
use futures::StreamExt;
use itertools::Either;
-use once_cell::sync::Lazy;
use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
use tokio::{io::AsyncRead, process::Command, select};
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
fn escape_bash(input: &str, out: &mut String) {
const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
return self;
}
let mut out = Self::new("env");
- if let Some(session) = self.ssh_session {
- out = out.ssh_session(session);
- }
+ out.ssh_session = self.ssh_session;
for (k, v) in self.env {
assert!(!k.contains('='));
out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
out
} else {
let mut out = Self::new("sudo");
+ out.ssh_session = self.ssh_session.take();
out.args(self.into_args());
out
}
}
- pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
- self.ssh_session = Some(on);
- self
- }
- pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
- let mut out = Self::new("ssh");
- out.ssh_session = self.ssh_session.take();
- out.arg(on).arg("--");
- out.arg(self.into_string());
- out
- }
pub async fn run(self) -> Result<()> {
let str = self.clone().into_string();
@@ -276,259 +256,6 @@
let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
assert!(v.is_none());
Ok(())
-}
-
-pub trait Handler: Send {
- fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
- fn clone(&self) -> Self {
- Self(self.0.clone())
- }
-}
-impl<H> ClonableHandler<H> {
- pub fn new(inner: H) -> Self {
- Self(Arc::new(Mutex::new(inner)))
- }
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
- fn handle_line(&mut self, e: &str) {
- self.0.lock().unwrap().handle_line(e)
- }
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
- fn handle_line(&mut self, e: &str) {
- info!(target: "log", "{e}");
- }
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
- fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
- spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
- static OSC_CLEANER: Lazy<Regex> =
- Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
- static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
- let m = OSC_CLEANER.replace_all(m, "");
- // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
- // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
- DETABBER.replace_all(m.as_ref(), " ").to_string()
-}
-impl Handler for NixHandler {
- fn handle_line(&mut self, e: &str) {
- if let Some(e) = e.strip_prefix("@nix ") {
- let log: NixLog = match serde_json::from_str(e) {
- Ok(l) => l,
- Err(err) => {
- warn!("failed to parse nix log line {:?}: {}", e, err);
- return;
- }
- };
- match log {
- NixLog::Msg { msg, raw_msg, .. } => {
- #[allow(clippy::nonminimal_bool)]
- if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
- && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
- && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
- if let Some(raw_msg) = raw_msg {
- if !msg.is_empty() {
- info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
- } else {
- info!(target: "nix", "{}", raw_msg.trim_end())
- }
- } else {
- info!(target: "nix", "{}", msg.trim_end())
- }
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 105 && !fields.is_empty() => {
- if let [LogField::String(drv), ..] = &fields[..] {
- let mut drv = drv.as_str();
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- info!(target: "nix","building {}", drv);
- let span = info_span!("build", drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad build log: {:?}", log)
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 100 && fields.len() >= 3 => {
- if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
- &fields[..]
- {
- let mut drv = drv.as_str();
-
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- // info!(target: "nix","copying {} {} -> {}", drv, from, to);
- let span = info_span!("copy", from, to, drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad copy log: {:?}", log)
- }
- }
- NixLog::Start { text, typ, id, .. }
- if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
- {
- if !text.is_empty()
- && text != "querying info about missing paths"
- && text != "copying 0 paths"
- // Too much spam on lazy-trees branch
- && !(text.starts_with("copying '") && text.ends_with("' to the store"))
- {
- let span = info_span!("job");
- span.pb_start();
- span.pb_set_message(&process_message(text.trim()));
- self.spans.insert(id, span);
- info!(target: "nix", "{}", text);
- }
- }
- NixLog::Start {
- text,
- level: 0,
- typ: 108,
- ..
- } if text.is_empty() => {
- // Cache lookup? Coupled with copy log
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 109,
- ..
- } if text.starts_with("querying info about ") => {
- // Cache lookup
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 101,
- ..
- } if text.starts_with("downloading ") => {
- // NAR downloading, coupled with copy log
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- ..
- } if text.starts_with("waiting for a machine to build ") => {
- // Useless repeating notification about build
- }
- NixLog::Start {
- text,
- level: 3,
- typ: 111,
- ..
- } if text.starts_with("resolved derivation: ") => {
- // CA resolved
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- id,
- ..
- } if text.starts_with("waiting for lock on ") => {
- let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
- if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
- drv = txt;
- }
- if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
- drv = txt;
- }
- if let Some(txt) = drv.split("', '").next() {
- drv = txt;
- }
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- let span = info_span!("waiting on drv", drv);
- span.pb_start();
- self.spans.insert(id, span);
- // Concurrent build of the same message
- }
- NixLog::Stop { id, .. } => {
- self.spans.remove(&id);
- }
- NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
- if let Some(span) = self.spans.get(&id) {
- if let LogField::String(s) = &fields[0] {
- span.pb_set_message(&process_message(s.trim()));
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- warn!("unknown result id: {id} {typ} {fields:?}");
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
- if let Some(span) = self.spans.get(&id) {
- if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
- &fields[..4]
- {
- span.pb_set_length(*expected);
- span.pb_set_position(*done);
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- // warn!("unknown result id: {id} {typ} {fields:?}");
- // Unaccounted progress.
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
- // Set phase, expected
- }
- _ => warn!("unknown log: {:?}", log),
- };
- } else {
- let e = e.trim();
- if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
- return;
- }
- info!("{e}")
- }
- }
}
async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
) -> Result<Option<Vec<u8>>> {
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());
+ debug!("running command {cmd:?} on local");
let mut child = cmd.spawn()?;
let mut stderr = child.stderr.take().unwrap();
let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
err_handler: &mut dyn Handler,
mut out_handler: Option<&mut dyn Handler>,
) -> Result<Option<Vec<u8>>> {
+ debug!("running command {cmd:?} over ssh");
cmd.stderr(openssh::Stdio::piped());
cmd.stdout(openssh::Stdio::piped());
let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
}
Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
- /// Return true to discard message from logging
- fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
- String(String),
- Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- struct StringOrNum;
- impl<'de> Visitor<'de> for StringOrNum {
- type Value = LogField;
-
- fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "string or unsigned")
- }
-
- fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::String(v.to_owned()))
- }
-
- fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::Num(v))
- }
- }
-
- deserializer.deserialize_any(StringOrNum)
- }
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
- Msg {
- level: u32,
- msg: String,
- raw_msg: Option<String>,
- },
- Start {
- id: u64,
- level: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- text: String,
- #[serde(rename = "type")]
- typ: u32,
- },
- Stop {
- id: u64,
- },
- Result {
- id: u64,
- #[serde(rename = "type")]
- typ: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- },
}
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
sync::{Arc, Mutex, MutexGuard, OnceLock},
};
-use age::Recipient;
use anyhow::{anyhow, bail, Context, Result};
use clap::{ArgGroup, Parser};
use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
pub struct ConfigHost {
pub name: String,
+ pub local: bool,
pub session: OnceLock<Arc<openssh::Session>>,
}
impl ConfigHost {
- pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ assert!(!self.local, "do not open ssh connection to local session");
// FIXME: TOCTOU
if let Some(session) = &self.session.get() {
return Ok((*session).clone());
@@ -96,8 +97,12 @@
D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
}
pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
- let session = self.open_session().await?;
- Ok(MyCommand::new_on(cmd, session))
+ if self.local {
+ Ok(MyCommand::new(cmd))
+ } else {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(cmd, session))
+ }
}
pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
.context("failed to call remote host for decrypt")?;
z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
}
+ pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+ for target in targets {
+ cmd.eqarg("--targets", target);
+ }
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+ }
/// Returns path for futureproofing, as path might change i.e on conversion to CA
pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+ if self.local {
+ // Path is located locally, thus already trusted.
+ return Ok(path.to_owned());
+ }
let mut nix = MyCommand::new("nix");
nix.arg("copy")
.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
nix.run_nix().await?;
Ok(path.to_owned())
}
+ pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("stop").arg(name);
+ cmd.sudo().run().await
+ }
+ pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("start").arg(name);
+ cmd.sudo().run().await
+ }
+
+ pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+ let mut cmd = self.cmd("rm").await?;
+ cmd.arg("-f").arg(path);
+ if sudo {
+ cmd = cmd.sudo()
+ }
+ cmd.run().await
+ }
}
impl Config {
@@ -134,35 +175,12 @@
}
pub fn is_local(&self, host: &str) -> bool {
self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
- }
-
- pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run().await
- }
- pub async fn run_string_on(
- &self,
- host: &str,
- mut command: MyCommand,
- sudo: bool,
- ) -> Result<String> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run_string().await
}
pub async fn host(&self, name: &str) -> Result<ConfigHost> {
Ok(ConfigHost {
name: name.to_owned(),
+ local: self.is_local(name),
session: OnceLock::new(),
})
}
@@ -172,6 +190,7 @@
let mut out = vec![];
for name in names {
out.push(ConfigHost {
+ local: self.is_local(&name),
name,
session: OnceLock::new(),
})
@@ -225,27 +244,6 @@
let mut data = self.data_mut();
let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
host_secrets.insert(secret, value);
- }
-
- pub async fn reencrypt_on_host(
- &self,
- host: &str,
- data: SecretData,
- targets: Vec<String>,
- ) -> Result<SecretData> {
- let mut recmd = MyCommand::new("fleet-install-secrets");
- recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
- for target in targets {
- recmd.eqarg("--targets", target);
- }
- recmd = recmd.sudo().ssh(host);
- let encoded = recmd
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?
- .trim()
- .to_owned();
- SecretData::decode_z85(&encoded)
}
pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
use std::str::FromStr;
-use crate::command::MyCommand;
use crate::host::Config;
use age::Recipient;
use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let mut cmd = MyCommand::new("cat");
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = self.run_string_on(host, cmd, false).await?;
- self.update_key(host, key.clone());
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
Ok(key)
}
}
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
cmds/remowt-agent/README.adocdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}
crates/better-command/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
crates/better-command/src/handler.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+ fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+impl<H> ClonableHandler<H> {
+ pub fn new(inner: H) -> Self {
+ Self(Arc::new(Mutex::new(inner)))
+ }
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+ fn handle_line(&mut self, e: &str) {
+ self.0.lock().unwrap().handle_line(e)
+ }
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+ fn handle_line(&mut self, e: &str) {
+ info!(target: "log", "{e}");
+ }
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+ fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+ spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+ String(String),
+ Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+ Msg {
+ level: u32,
+ msg: String,
+ raw_msg: Option<String>,
+ },
+ Start {
+ id: u64,
+ level: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ text: String,
+ #[serde(rename = "type")]
+ typ: u32,
+ },
+ Stop {
+ id: u64,
+ },
+ Result {
+ id: u64,
+ #[serde(rename = "type")]
+ typ: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ },
+}
+fn process_message(m: &str) -> String {
+ // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+ static OSC_CLEANER: Lazy<Regex> =
+ Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
+}
+impl Handler for NixHandler {
+ fn handle_line(&mut self, e: &str) {
+ if let Some(e) = e.strip_prefix("@nix ") {
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ return;
+ }
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ #[allow(clippy::nonminimal_bool)]
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ if !msg.is_empty() {
+ info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+ } else {
+ info!(target: "nix", "{}", raw_msg.trim_end())
+ }
+ } else {
+ info!(target: "nix", "{}", msg.trim_end())
+ }
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ let mut drv = drv.as_str();
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ info!(target: "nix","building {}", drv);
+ let span = info_span!("build", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+ &fields[..]
+ {
+ let mut drv = drv.as_str();
+
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ let span = info_span!("copy", from, to, drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ }
+ NixLog::Start { text, typ, id, .. }
+ if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+ {
+ if !text.is_empty()
+ && text != "querying info about missing paths"
+ && text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
+ {
+ let span = info_span!("job");
+ span.pb_start();
+ span.pb_set_message(&process_message(text.trim()));
+ self.spans.insert(id, span);
+ info!(target: "nix", "{}", text);
+ }
+ }
+ NixLog::Start {
+ text,
+ level: 0,
+ typ: 108,
+ ..
+ } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 109,
+ ..
+ } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 101,
+ ..
+ } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ ..
+ } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start {
+ text,
+ level: 3,
+ typ: 111,
+ ..
+ } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ id,
+ ..
+ } if text.starts_with("waiting for lock on ") => {
+ let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+ if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.split("', '").next() {
+ drv = txt;
+ }
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ let span = info_span!("waiting on drv", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ // Concurrent build of the same message
+ }
+ NixLog::Stop { id, .. } => {
+ self.spans.remove(&id);
+ }
+ NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+ if let Some(span) = self.spans.get(&id) {
+ if let LogField::String(s) = &fields[0] {
+ span.pb_set_message(&process_message(s.trim()));
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ warn!("unknown result id: {id} {typ} {fields:?}");
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+ if let Some(span) = self.spans.get(&id) {
+ if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+ &fields[..4]
+ {
+ span.pb_set_length(*expected);
+ span.pb_set_position(*done);
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ // warn!("unknown result id: {id} {typ} {fields:?}");
+ // Unaccounted progress.
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+ // Set phase, expected
+ }
+ _ => warn!("unknown log: {:?}", log),
+ };
+ } else {
+ let e = e.trim();
+ if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+ return;
+ }
+ info!("{e}")
+ }
+ }
+}
crates/better-command/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn it_works() {
+ let result = add(2, 2);
+ assert_eq!(result, 4);
+ }
+}