difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
"anyhow",
"async-trait",
"base64 0.21.5",
+ "better-command",
"chrono",
"clap",
"futures",
@@ -1510,9 +1523,9 @@
[[package]]
name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
@@ -1922,6 +1935,10 @@
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
name = "rnix"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
[[package]]
name = "serde"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
@@ -2123,9 +2140,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -2134,9 +2151,9 @@
[[package]]
name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [
"itoa",
"ryu",
@@ -2527,11 +2544,10 @@
[[package]]
name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
- "cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2539,9 +2555,9 @@
[[package]]
name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -2550,9 +2566,9 @@
[[package]]
name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -2560,9 +2576,9 @@
[[package]]
name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
dependencies = [
"indicatif",
"tracing",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
[workspace]
members = ["crates/*", "cmds/*"]
resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
edition = "2021"
[dependencies]
+nixlike.workspace = true
+better-command.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -15,7 +17,6 @@
hostname = "0.3.1"
age-core = "0.9.0"
peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
age = { version = "0.9.2", features = ["ssh", "armor"] }
base64 = "0.21.5"
chrono = { version = "0.4.31", features = ["serde"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::ffi::{OsStr, OsString};3use std::fmt::{self, Display};4use std::path::PathBuf;5use std::process::Stdio;6use std::sync::{Arc, OnceLock};78use anyhow::{anyhow, bail, ensure, Context, Result};9use futures::StreamExt;10use itertools::Itertools;11use r2d2::{Pool, PooledConnection};12use serde::de::DeserializeOwned;13use serde::{Deserialize, Serialize};14use tokio::io::AsyncWriteExt;15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};16use tokio::select;17use tokio::sync::{mpsc, oneshot};18use tokio_util::codec::{FramedRead, LinesCodec};19use tracing::{debug, error, warn, Level};2021use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};2223const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2425pub struct NixSessionInner {26 full_delimiter: String,27 nix_handler: ClonableHandler<NixHandler>,28 out: OutputHandler,29 stdin: ChildStdin,30 string_wrapping: (String, String),31 number_wrapping: (String, String),3233 next_id: u32,34 free_list: Vec<u32>,35}36const TRAIN_STRING: &str = "\"TRAIN_STRING\"";37const TRAIN_NUMBER: &str = "13141516";3839#[must_use]40struct ErrorCollector<'i, H> {41 collected: Vec<String>,42 inner: &'i mut H,43}44impl<'i, H> ErrorCollector<'i, H> {45 fn new(inner: &'i mut H) -> Self {46 Self {47 collected: vec![],48 inner,49 }50 }51}52impl<H> ErrorCollector<'_, H> {53 fn handle_line_inner(&mut self, msg: &str) -> bool {54 let Some(msg) = msg.strip_prefix("@nix ") else {55 return false;56 };57 #[derive(Deserialize)]58 struct ErrorAction {59 action: String,60 level: u32,61 msg: String,62 }63 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {64 return false;65 };66 if act.action != "msg" || act.level != 0 {67 return false;68 }69 self.collected.push(act.msg);70 true71 }72 fn finish(self) -> Result<()> {73 // fn dedent(s: String) -> String {74 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)75 // }76 if !self.collected.is_empty() {77 bail!(78 "{}",79 self.collected80 .iter()81 .map(|v| {82 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {83 let v = unindent::unindent(f.trim_start());84 v.trim().to_owned()85 } else {86 v.to_owned()87 }88 })89 .join("\n")90 );91 }92 Ok(())93 }94 fn flush(self) {95 for line in self.collected {96 warn!("{line}");97 }98 }99}100impl<H: Handler> Handler for ErrorCollector<'_, H> {101 fn handle_line(&mut self, e: &str) {102 if self.handle_line_inner(e) {103 return;104 }105 self.inner.handle_line(e)106 }107}108109enum OutputLine {110 Out(String),111 Err(String),112}113struct OutputHandler {114 rx: mpsc::Receiver<OutputLine>,115 _cancel_handle: oneshot::Receiver<()>,116}117impl OutputHandler {118 fn new(out: ChildStdout, err: ChildStderr) -> Self {119 let mut out = FramedRead::new(out, LinesCodec::new());120 let mut err = FramedRead::new(err, LinesCodec::new());121 let (tx, rx) = mpsc::channel(20);122 let (mut cancelled, _cancel_handle) = oneshot::channel();123 tokio::spawn(async move {124 loop {125 select! {126 // We should receive errors earlier than synchronization127 biased;128 e = err.next() => {129 let Some(Ok(e)) = e else {130 if e.is_some() {131 error!("bad repl stderr: {e:?}");132 }133 continue;134 };135 let _ = tx.send(OutputLine::Err(e)).await;136 }137 o = out.next() => {138 let Some(Ok(o)) = o else {139 if o.is_some() {140 error!("bad repl stdout: {o:?}");141 }142 continue;143 };144 let _ = tx.send(OutputLine::Out(o)).await;145 }146 // Reader doesn't care about stdout, as this is cancelled.147 // Error still might be useful, to process leftover span closures?148 _ = cancelled.closed() => {149 break;150 }151 }152 }153 });154 Self { rx, _cancel_handle }155 }156 async fn next(&mut self) -> Option<OutputLine> {157 self.rx.recv().await158 }159}160161struct WarnHandler;162impl Handler for WarnHandler {163 fn handle_line(&mut self, e: &str) {164 warn!(target: "nix", "{e}")165 }166}167168impl NixSessionInner {169 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {170 let mut cmd = Command::new("nix");171 cmd.arg("repl")172 .arg(flake)173 .arg("--log-format")174 .arg("internal-json");175 for arg in extra_args {176 cmd.arg(arg);177 }178 cmd.stdin(Stdio::piped());179 cmd.stdout(Stdio::piped());180 cmd.stderr(Stdio::piped());181 let cmd = cmd.spawn()?;182 let stdout = cmd.stdout.unwrap();183 let stderr = cmd.stderr.unwrap();184 let mut out = OutputHandler::new(stdout, stderr);185 let mut stdin = cmd.stdin.unwrap();186 // Standard repl hello doesn't work with internal-json logger187 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;188 stdin.write_all(b"\n").await?;189 stdin.flush().await?;190 let nix_handler = NixHandler::default();191 let mut full_delimiter = None;192 let mut errors = vec![];193 while let Some(line) = out.next().await {194 let line = match line {195 OutputLine::Out(o) => o,196 OutputLine::Err(_e) => {197 // Handle startup errors, but skip repl hello?198 errors.push(_e);199 continue;200 }201 };202 if line.contains(REPL_DELIMITER) {203 debug!("discovered repl delimiter with added colors: {line}");204 full_delimiter = Some(line.to_owned());205 break;206 }207 }208 let Some(full_delimiter) = full_delimiter else {209 for e in errors {210 error!("{e}");211 }212 bail!("failed to discover delimiter");213 };214 let mut res = Self {215 full_delimiter,216 nix_handler: ClonableHandler::new(nix_handler),217 out,218 stdin,219 string_wrapping: Default::default(),220 number_wrapping: Default::default(),221222 next_id: 0,223 free_list: vec![],224 };225 res.train().await?;226 Ok(res)227 }228 async fn train(&mut self) -> Result<()> {229 {230 let full_string = self231 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)232 .await?;233 let string_offset = full_string.find(TRAIN_STRING).expect("contained");234 let string_prefix = &full_string[..string_offset];235 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];236 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());237 }238 {239 let full_number = self240 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)241 .await?;242 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");243 let number_prefix = &full_number[..number_offset];244 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];245 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());246 }247 Ok(())248 }249 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {250 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {251 let cmd_str = String::from_utf8_lossy(cmd.as_ref());252 tracing::debug!("{cmd_str}");253 };254 self.stdin.write_all(cmd.as_ref()).await?;255 self.stdin.write_all(b"\n").await?;256 Ok(())257 }258 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {259 let mut out = String::new();260 while let Some(line) = self.out.next().await {261 let line = match line {262 OutputLine::Out(out) => out,263 OutputLine::Err(err) => {264 err_handler.handle_line(&err);265 continue;266 }267 };268 if line == self.full_delimiter {269 return Ok(out);270 }271 if !out.is_empty() {272 out.push('\n');273 }274 out.push_str(&line);275 }276 bail!("didn't reached delimiter");277 }278 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {279 let num = self.number_wrapping.clone();280 let n = self.execute_expression_wrapping(expr, &num).await?;281 Ok(n.parse::<u64>()?)282 }283 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {284 let num = self.string_wrapping.clone();285 let n = self.execute_expression_wrapping(expr, &num).await?;286 let str: String = serde_json::from_str(&n)?;287 Ok(str)288 }289 async fn execute_expression_to_json<V: DeserializeOwned>(290 &mut self,291 expr: impl AsRef<[u8]>,292 ) -> Result<V> {293 let mut fexpr = b"builtins.toJSON (".to_vec();294 fexpr.extend_from_slice(expr.as_ref());295 fexpr.push(b')');296 let v = self.execute_expression_string(fexpr).await?;297 Ok(serde_json::from_str(&v)?)298 }299 async fn execute_expression_wrapping(300 &mut self,301 expr: impl AsRef<[u8]>,302 wrapping: &(String, String),303 ) -> Result<String> {304 let mut nix_handler = self.nix_handler.clone();305 let mut collected = ErrorCollector::new(&mut nix_handler);306 let res = self.execute_expression_raw(expr, &mut collected).await?;307 if res.is_empty() {308 collected.finish()?;309 bail!("expected expression, got nothing")310 } else {311 collected.flush()312 };313 let Some(res) = res.strip_prefix(&wrapping.0) else {314 bail!("invalid type")315 };316 let Some(res) = res.strip_suffix(&wrapping.1) else {317 bail!("invalid type")318 };319 Ok(res.to_owned())320 }321 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {322 let mut nix_handler = self.nix_handler.clone();323 let mut collected = ErrorCollector::new(&mut nix_handler);324 let v = self.execute_expression_raw(expr, &mut collected).await?;325 collected.finish()?;326 ensure!(v.is_empty(), "unexpected expression result");327 Ok(())328 }329 async fn execute_expression_raw(330 &mut self,331 expr: impl AsRef<[u8]>,332 err_handler: &mut dyn Handler,333 ) -> Result<String> {334 self.send_command(expr).await?;335 // It will be echoed336 self.send_command(REPL_DELIMITER).await?;337 self.read_until_delimiter(err_handler).await338 }339 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {340 let id = self.allocate_id();341 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))342 .await?;343 Ok(id)344 }345346 /// Id should be immediately used347 fn allocate_id(&mut self) -> u32 {348 if let Some(free) = self.free_list.pop() {349 free350 } else {351 let v = self.next_id;352 self.next_id += 1;353 v354 }355 }356 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.357 // async fn free_id(&mut self, id: u32) -> Result<()> {358 // self.execute_expression_empty(format!("sess_field_{id} = null"))359 // .await?;360 // self.free_list.push(id);361 // Ok(())362 // }363}364365#[derive(Clone)]366pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);367368#[derive(Clone)]369pub struct NixExprBuilder {370 out: String,371 used_fields: Vec<Field>,372}373impl NixExprBuilder {374 pub fn object() -> Self {375 NixExprBuilder {376 out: "{ ".to_owned(),377 used_fields: Vec::new(),378 }379 }380 pub fn string(s: &str) -> Self {381 NixExprBuilder {382 out: nixlike::serialize(s)383 .expect("no problems with serializing_string")384 .trim_end()385 .to_owned(),386 used_fields: Vec::new(),387 }388 }389 pub fn serialized(v: impl Serialize) -> Self {390 let serialized = nixlike::serialize(v).expect("invalid value for apply");391 Self {392 out: serialized.trim_end().to_owned(),393 used_fields: Vec::new(),394 }395 }396 pub fn field(f: Field) -> Self {397 Self {398 out: format!("sess_field_{}", f.0.value.expect("no value")),399 used_fields: vec![f],400 }401 }402 pub fn end_obj(&mut self) {403 self.out.push('}');404 }405 pub fn obj_key(&mut self, name: Self, value: Self) {406 self.out.push_str(r#""${"#);407 self.extend(name);408 self.out.push_str(r#"}" = "#);409 self.extend(value);410 self.out.push_str("; ");411 }412413 pub fn extend(&mut self, e: Self) {414 self.out.push_str(&e.out);415 self.used_fields.extend(e.used_fields);416 }417418 pub fn session(&self) -> NixSession {419 let mut session = None;420 for ele in &self.used_fields {421 if session.is_none() {422 session = Some(ele.0.session.clone());423 continue;424 }425 let session = &session.as_ref().expect("checked").0;426 let ele_sess = &ele.0.session.0;427 assert!(428 Arc::ptr_eq(session, ele_sess),429 "can't mix fields from different session"430 );431 }432 session.expect("expr without fields used")433 }434 pub fn index_attr(&mut self, s: &str) {435 let escaped = nixlike::serialize(s).expect("string");436 self.out.push('.');437 self.out.push_str(escaped.trim_end());438 }439}440441#[macro_export]442macro_rules! nix_expr_inner {443 (Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{444 use $crate::better_nix_eval::NixExprBuilder;445 let mut out = NixExprBuilder::object();446 $(447 out.obj_key(448 NixExprBuilder::string(stringify!($ident)),449 $crate::nix_expr_inner!($($val)+),450 );451 )*452 out.end_obj();453 out454 }};455 (@field($o:ident) . $var:ident $($tt:tt)*) => {{456 $o.index_attr(stringify!($var));457 nix_expr_inner!(@field($o) $($tt)*);458 }};459 (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{460 $o.push(Index::attr(&$v));461 nix_expr_inner!(@o($o) $($tt)*);462 }};463 (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{464 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));465 nix_expr_inner!(@o($o) $($tt)*);466 }};467 (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {468 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));469 nix_expr_inner!(@o($o) $($tt)*);470 };471 (@field($o:ident)) => {};472 ($field:ident $($tt:tt)*) => {{473 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};474 #[allow(unused_mut, reason = "might be used if indexed")]475 let mut out = NixExprBuilder::field($field.clone());476 nix_expr_inner!(@field(out) $($tt)*);477 out478 }};479 ($v:literal) => {{480 use $crate::better_nix_eval::NixExprBuilder;481 NixExprBuilder::string($v)482 }};483 ({$v:expr}) => {{484 use $crate::better_nix_eval::NixExprBuilder;485 NixExprBuilder::serialized(&$v)486 }}487}488#[macro_export]489macro_rules! nix_expr {490 ($($tt:tt)+) => {{491 use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};492 let expr = nix_expr_inner!($($tt)+);493 Field::new(expr.session(), expr.out)494 }};495}496497#[macro_export]498macro_rules! nix_go {499 (@o($o:ident) . $var:ident $($tt:tt)*) => {{500 $o.push(Index::attr(stringify!($var)));501 nix_go!(@o($o) $($tt)*);502 }};503 (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{504 $o.push(Index::attr(&$v));505 nix_go!(@o($o) $($tt)*);506 }};507 (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{508 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));509 nix_go!(@o($o) $($tt)*);510 }};511 (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {512 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));513 nix_go!(@o($o) $($tt)*);514 };515 (@o($o:ident)) => {};516 ($field:ident $($tt:tt)+) => {{517 use $crate::{nix_go, better_nix_eval::Index};518 let field = $field.clone();519 let mut out = vec![];520 nix_go!(@o(out) $($tt)*);521 field.select(out).await?522 }}523}524#[macro_export]525macro_rules! nix_go_json {526 ($($tt:tt)*) => {{527 $crate::nix_go!($($tt)*).as_json().await?528 }};529}530531#[derive(Clone)]532pub enum Index {533 Var(String),534 String(String),535 Apply(String),536 Expr(NixExprBuilder),537 ExprApply(NixExprBuilder),538}539impl Index {540 pub fn var(v: impl AsRef<str>) -> Self {541 let v = v.as_ref();542 assert!(543 !(v.contains('.') | v.contains(' ')),544 "bad variable name: {v}"545 );546 Self::Var(v.to_owned())547 }548 pub fn attr(v: impl AsRef<str>) -> Self {549 Self::String(v.as_ref().to_owned())550 }551 pub fn apply(v: impl Serialize) -> Self {552 let serialized = nixlike::serialize(v).expect("invalid value for apply");553 Self::Apply(serialized.trim_end().to_owned())554 }555}556impl Display for Index {557 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {558 match self {559 Index::Var(v) => {560 write!(f, "{v}")561 }562 Index::String(k) => {563 let v = nixlike::format_identifier(k.as_str());564 write!(f, ".{v}")565 }566 Index::Apply(o) => {567 write!(f, "<apply>({o})")568 }569 Index::Expr(e) => {570 write!(f, "[{}]", e.out)571 }572 Index::ExprApply(e) => {573 write!(f, "<apply>({})", e.out)574 }575 }576 }577}578impl fmt::Debug for Index {579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {580 write!(f, "{self}")581 }582}583struct PathDisplay<'i>(&'i [Index]);584impl Display for PathDisplay<'_> {585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {586 for i in self.0 {587 write!(f, "{i}")?;588 }589 Ok(())590 }591}592struct FieldInner {593 full_path: Option<Vec<Index>>,594 session: NixSession,595 value: Option<u32>,596}597fn context(full_path: Option<&[Index]>, query: &str) -> String {598 if let Some(full_path) = &full_path {599 format!("full path: {}", PathDisplay(full_path))600 } else {601 format!("query: {query:?}")602 }603}604#[derive(Clone)]605pub struct Field(Arc<FieldInner>);606impl Field {607 fn root(session: NixSession) -> Self {608 Self(Arc::new(FieldInner {609 full_path: Some(vec![]),610 session,611 value: None,612 }))613 }614 async fn new(session: NixSession, query: &str) -> Result<Self> {615 let vid = session616 .0617 .lock()618 .await619 .execute_assign(query)620 .await621 .with_context(|| context(None, query))?;622 Ok(Self(Arc::new(FieldInner {623 full_path: None,624 session,625 value: Some(vid),626 })))627 }628 pub async fn field(session: NixSession, field: &str) -> Result<Self> {629 Self::root(session).select([Index::var(field)]).await630 }631 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {632 let mut used_fields = Vec::new();633 let mut name = name.into_iter();634635 let mut full_path = self.0.full_path.clone();636 let mut query = if let Some(id) = self.0.value {637 format!("sess_field_{id}")638 } else {639 let first = name.next();640 if let Some(Index::Var(i)) = first {641 if let Some(full_path) = &mut full_path {642 full_path.push(Index::Var(i.clone()));643 }644 i.clone()645 } else {646 panic!("first path item should be variable, got {first:?}")647 }648 };649 for v in name {650 if let Some(full_path) = &mut full_path {651 full_path.push(v.clone());652 }653 match v {654 Index::Var(_) => panic!("var item may only be first"),655 Index::String(s) => {656 let escaped = nixlike::serialize(s)?;657 query.push('.');658 query.push_str(escaped.trim());659 }660 Index::Apply(a) => {661 // In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`662 query = format!("({query} {a})");663 }664 Index::Expr(e) => {665 let index = Field::new(self.0.session.clone(), &e.out).await?;666 used_fields.push(index.clone());667 query.push('.');668 let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));669 query.push_str(&index);670 }671 Index::ExprApply(e) => {672 let index = Field::new(self.0.session.clone(), &e.out).await?;673 used_fields.push(index.clone());674 query.push(' ');675 let index = format!("sess_field_{}", index.0.value.expect("value"));676 query.push_str(&index);677 query = format!("({query})");678 }679 }680 }681682 let vid = self683 .0684 .session685 .0686 .lock()687 .await688 .execute_assign(&query)689 .await690 .with_context(|| {691 if let Some(full_path) = &full_path {692 format!("full path: {}", PathDisplay(full_path))693 } else {694 format!("query: {query:?}")695 }696 })?;697 Ok(Self(Arc::new(FieldInner {698 full_path,699 session: self.0.session.clone(),700 value: Some(vid),701 })))702 }703 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {704 let id = self.0.value.expect("can't serialize root field");705 let query = format!("sess_field_{id}");706 self.0707 .session708 .0709 .lock()710 .await711 .execute_expression_to_json(&query)712 .await713 .with_context(|| context(self.0.full_path.as_deref(), &query))714 }715 pub async fn has_field(&self, name: &str) -> Result<bool> {716 let id = self.0.value.expect("can't list root fields");717 let key = nixlike::escape_string(name);718 let query = format!("sess_field_{id} ? {key}");719 self.0720 .session721 .0722 .lock()723 .await724 .execute_expression_to_json(&query)725 .await726 .with_context(|| context(self.0.full_path.as_deref(), &query))727 }728 pub async fn list_fields(&self) -> Result<Vec<String>> {729 let id = self.0.value.expect("can't list root fields");730 let query = format!("builtins.attrNames sess_field_{id}");731 self.0732 .session733 .0734 .lock()735 .await736 .execute_expression_to_json(&query)737 .await738 .with_context(|| context(self.0.full_path.as_deref(), &query))739 }740 pub async fn type_of(&self) -> Result<String> {741 let id = self.0.value.expect("can't list root fields");742 let query = format!("builtins.typeOf sess_field_{id}");743 self.0744 .session745 .0746 .lock()747 .await748 .execute_expression_to_json(&query)749 .await750 .with_context(|| context(self.0.full_path.as_deref(), &query))751 }752 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {753 let id = self.0.value.expect("can't use build on not-value");754 let query = format!(":b sess_field_{id}");755 let vid = self756 .0757 .session758 .0759 .lock()760 .await761 .execute_expression_raw(&query, &mut NixHandler::default())762 .await?;763 ensure!(764 !vid.is_empty(),765 "build failed: {}",766 context(self.0.full_path.as_deref(), &query),767 );768 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")769 else {770 panic!("unexpected build output: {vid:?}");771 };772 let outputs = vid773 .split('\n')774 .filter(|v| !v.is_empty())775 .map(|v| v.split_once(" -> ").expect("unexpected build output"))776 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))777 .collect();778 Ok(outputs)779 }780}781impl Drop for FieldInner {782 fn drop(&mut self) {783 if let Some(id) = self.value {784 if let Ok(mut lock) = self.session.0.try_lock() {785 lock.free_list.push(id)786 }787 // Leaked788 }789 }790}791struct NixSessionPoolInner {792 flake: OsString,793 nix_args: Vec<OsString>,794}795796#[derive(Debug)]797pub struct NixPoolError(anyhow::Error);798impl From<anyhow::Error> for NixPoolError {799 fn from(value: anyhow::Error) -> Self {800 Self(value)801 }802}803impl std::error::Error for NixPoolError {}804impl std::fmt::Display for NixPoolError {805 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {806 self.0.fmt(f)807 }808}809impl r2d2::ManageConnection for NixSessionPoolInner {810 type Connection = NixSessionInner;811 type Error = NixPoolError;812 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {813 let _v = TOKIO_RUNTIME814 .get()815 .expect("missed tokio runtime init!")816 .enter();817 Ok(futures::executor::block_on(NixSessionInner::new(818 self.flake.as_os_str(),819 self.nix_args.iter().map(OsString::as_os_str),820 ))?)821 }822823 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {824 let _v = TOKIO_RUNTIME825 .get()826 .expect("missed tokio runtime init!")827 .enter();828 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;829 if res != 4 {830 return Err(anyhow!("sanity check failed").into());831 };832 Ok(())833 }834835 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {836 false837 }838}839pub struct NixSessionPool(Pool<NixSessionPoolInner>);840impl NixSessionPool {841 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {842 let inner = tokio::task::block_in_place(|| {843 r2d2::Builder::<NixSessionPoolInner>::new()844 .min_idle(Some(0))845 .build(NixSessionPoolInner { flake, nix_args })846 })?;847 Ok(Self(inner))848 }849 pub async fn get(&self) -> Result<NixSession> {850 let v = tokio::task::block_in_place(|| self.0.get())?;851 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))852 }853}854855pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();1//! Wrapper around nix repl, which allows to work on nix code, without relying on2//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.34use std::collections::HashMap;5use std::ffi::{OsStr, OsString};6use std::fmt::{self, Display};7use std::path::PathBuf;8use std::process::Stdio;9use std::sync::{Arc, OnceLock};1011use anyhow::{anyhow, bail, ensure, Context, Result};12use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};13use futures::StreamExt;14use itertools::Itertools;15use r2d2::{Pool, PooledConnection};16use serde::de::DeserializeOwned;17use serde::{Deserialize, Serialize};18use tokio::io::AsyncWriteExt;19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};20use tokio::select;21use tokio::sync::{mpsc, oneshot, Mutex};22use tokio_util::codec::{FramedRead, LinesCodec};23use tracing::{debug, error, warn, Level};2425const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2627pub struct NixSessionInner {28 full_delimiter: String,29 nix_handler: ClonableHandler<NixHandler>,30 out: OutputHandler,31 stdin: ChildStdin,32 string_wrapping: (String, String),33 number_wrapping: (String, String),3435 executing_command: Arc<Mutex<()>>,3637 next_id: u32,38 free_list: Vec<u32>,39}40const TRAIN_STRING: &str = "\"TRAIN_STRING\"";41const TRAIN_NUMBER: &str = "13141516";4243#[must_use]44struct ErrorCollector<'i, H> {45 collected: Vec<String>,46 inner: &'i mut H,47}48impl<'i, H> ErrorCollector<'i, H> {49 fn new(inner: &'i mut H) -> Self {50 Self {51 collected: vec![],52 inner,53 }54 }55}56impl<H> ErrorCollector<'_, H> {57 fn handle_line_inner(&mut self, msg: &str) -> bool {58 let Some(msg) = msg.strip_prefix("@nix ") else {59 return false;60 };61 #[derive(Deserialize)]62 struct ErrorAction {63 action: String,64 level: u32,65 msg: String,66 }67 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {68 return false;69 };70 if act.action != "msg" || act.level != 0 {71 return false;72 }73 self.collected.push(act.msg);74 true75 }76 fn finish(self) -> Result<()> {77 // fn dedent(s: String) -> String {78 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)79 // }80 if !self.collected.is_empty() {81 bail!(82 "{}",83 self.collected84 .iter()85 .map(|v| {86 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {87 let v = unindent::unindent(f.trim_start());88 v.trim().to_owned()89 } else {90 v.to_owned()91 }92 })93 .join("\n")94 );95 }96 Ok(())97 }98 fn flush(self) {99 for line in self.collected {100 warn!("{line}");101 }102 }103}104impl<H: Handler> Handler for ErrorCollector<'_, H> {105 fn handle_line(&mut self, e: &str) {106 if self.handle_line_inner(e) {107 return;108 }109 self.inner.handle_line(e)110 }111}112113enum OutputLine {114 Out(String),115 Err(String),116}117struct OutputHandler {118 rx: mpsc::Receiver<OutputLine>,119 _cancel_handle: oneshot::Receiver<()>,120}121impl OutputHandler {122 fn new(out: ChildStdout, err: ChildStderr) -> Self {123 let mut out = FramedRead::new(out, LinesCodec::new());124 let mut err = FramedRead::new(err, LinesCodec::new());125 let (tx, rx) = mpsc::channel(20);126 let (mut cancelled, _cancel_handle) = oneshot::channel();127 tokio::spawn(async move {128 loop {129 select! {130 // We should receive errors earlier than synchronization131 biased;132 e = err.next() => {133 let Some(Ok(e)) = e else {134 if e.is_some() {135 error!("bad repl stderr: {e:?}");136 }137 continue;138 };139 let _ = tx.send(OutputLine::Err(e)).await;140 }141 o = out.next() => {142 let Some(Ok(o)) = o else {143 if o.is_some() {144 error!("bad repl stdout: {o:?}");145 }146 continue;147 };148 let _ = tx.send(OutputLine::Out(o)).await;149 }150 // Reader doesn't care about stdout, as this is cancelled.151 // Error still might be useful, to process leftover span closures?152 _ = cancelled.closed() => {153 break;154 }155 }156 }157 });158 Self { rx, _cancel_handle }159 }160 async fn next(&mut self) -> Option<OutputLine> {161 self.rx.recv().await162 }163}164165struct WarnHandler;166impl Handler for WarnHandler {167 fn handle_line(&mut self, e: &str) {168 warn!(target: "nix", "{e}")169 }170}171172impl NixSessionInner {173 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {174 let mut cmd = Command::new("nix");175 cmd.arg("repl")176 .arg(flake)177 .arg("--log-format")178 .arg("internal-json");179 for arg in extra_args {180 cmd.arg(arg);181 }182 cmd.stdin(Stdio::piped());183 cmd.stdout(Stdio::piped());184 cmd.stderr(Stdio::piped());185 let cmd = cmd.spawn()?;186 let stdout = cmd.stdout.unwrap();187 let stderr = cmd.stderr.unwrap();188 let mut out = OutputHandler::new(stdout, stderr);189 let mut stdin = cmd.stdin.unwrap();190 // Standard repl hello doesn't work with internal-json logger191 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;192 stdin.write_all(b"\n").await?;193 stdin.flush().await?;194 let nix_handler = NixHandler::default();195 let mut full_delimiter = None;196 let mut errors = vec![];197 while let Some(line) = out.next().await {198 let line = match line {199 OutputLine::Out(o) => o,200 OutputLine::Err(_e) => {201 // Handle startup errors, but skip repl hello?202 errors.push(_e);203 continue;204 }205 };206 if line.contains(REPL_DELIMITER) {207 debug!("discovered repl delimiter with added colors: {line}");208 full_delimiter = Some(line.to_owned());209 break;210 }211 }212 let Some(full_delimiter) = full_delimiter else {213 for e in errors {214 error!("{e}");215 }216 bail!("failed to discover delimiter");217 };218 let mut res = Self {219 full_delimiter,220 nix_handler: ClonableHandler::new(nix_handler),221 out,222 stdin,223 string_wrapping: Default::default(),224 number_wrapping: Default::default(),225226 executing_command: Arc::new(Mutex::new(())),227228 next_id: 0,229 free_list: vec![],230 };231 res.train().await?;232 Ok(res)233 }234 async fn train(&mut self) -> Result<()> {235 {236 let full_string = self237 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)238 .await?;239 let string_offset = full_string.find(TRAIN_STRING).expect("contained");240 let string_prefix = &full_string[..string_offset];241 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];242 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());243 }244 {245 let full_number = self246 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)247 .await?;248 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");249 let number_prefix = &full_number[..number_offset];250 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];251 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());252 }253 Ok(())254 }255 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {256 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {257 let cmd_str = String::from_utf8_lossy(cmd.as_ref());258 tracing::debug!("{cmd_str}");259 };260 self.stdin.write_all(cmd.as_ref()).await?;261 self.stdin.write_all(b"\n").await?;262 Ok(())263 }264 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {265 let mut out = String::new();266 while let Some(line) = self.out.next().await {267 let line = match line {268 OutputLine::Out(out) => out,269 OutputLine::Err(err) => {270 err_handler.handle_line(&err);271 continue;272 }273 };274 if line == self.full_delimiter {275 return Ok(out);276 }277 if !out.is_empty() {278 out.push('\n');279 }280 out.push_str(&line);281 }282 bail!("didn't reached delimiter");283 }284 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {285 let num = self.number_wrapping.clone();286 let n = self.execute_expression_wrapping(expr, &num).await?;287 Ok(n.parse::<u64>()?)288 }289 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {290 let num = self.string_wrapping.clone();291 let n = self.execute_expression_wrapping(expr, &num).await?;292 let str: String = serde_json::from_str(&n)?;293 Ok(str)294 }295 async fn execute_expression_to_json<V: DeserializeOwned>(296 &mut self,297 expr: impl AsRef<[u8]>,298 ) -> Result<V> {299 let mut fexpr = b"builtins.toJSON (".to_vec();300 fexpr.extend_from_slice(expr.as_ref());301 fexpr.push(b')');302 let v = self.execute_expression_string(fexpr).await?;303 Ok(serde_json::from_str(&v)?)304 }305 async fn execute_expression_wrapping(306 &mut self,307 expr: impl AsRef<[u8]>,308 wrapping: &(String, String),309 ) -> Result<String> {310 let mut nix_handler = self.nix_handler.clone();311 let mut collected = ErrorCollector::new(&mut nix_handler);312 let res = self.execute_expression_raw(expr, &mut collected).await?;313 if res.is_empty() {314 collected.finish()?;315 bail!("expected expression, got nothing")316 } else {317 collected.flush()318 };319 let Some(res) = res.strip_prefix(&wrapping.0) else {320 bail!("invalid type")321 };322 let Some(res) = res.strip_suffix(&wrapping.1) else {323 bail!("invalid type")324 };325 Ok(res.to_owned())326 }327 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {328 let mut nix_handler = self.nix_handler.clone();329 let mut collected = ErrorCollector::new(&mut nix_handler);330 let v = self.execute_expression_raw(expr, &mut collected).await?;331 collected.finish()?;332 ensure!(v.is_empty(), "unexpected expression result");333 Ok(())334 }335 async fn execute_expression_raw(336 &mut self,337 expr: impl AsRef<[u8]>,338 err_handler: &mut dyn Handler,339 ) -> Result<String> {340 // Prevent two commands from being executed in parallel, messing with each other.341 let _lock = self.executing_command.clone();342 let _guard = _lock.lock().await;343344 self.send_command(expr).await?;345 // It will be echoed346 self.send_command(REPL_DELIMITER).await?;347 self.read_until_delimiter(err_handler).await348 }349 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {350 let id = self.allocate_id();351 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))352 .await?;353 Ok(id)354 }355356 /// Id should be immediately used357 fn allocate_id(&mut self) -> u32 {358 if let Some(free) = self.free_list.pop() {359 free360 } else {361 let v = self.next_id;362 self.next_id += 1;363 v364 }365 }366 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.367 // async fn free_id(&mut self, id: u32) -> Result<()> {368 // self.execute_expression_empty(format!("sess_field_{id} = null"))369 // .await?;370 // self.free_list.push(id);371 // Ok(())372 // }373}374375#[derive(Clone)]376pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);377378#[derive(Clone)]379pub struct NixExprBuilder {380 out: String,381 used_fields: Vec<Field>,382}383impl NixExprBuilder {384 pub fn object() -> Self {385 NixExprBuilder {386 out: "{ ".to_owned(),387 used_fields: Vec::new(),388 }389 }390 pub fn string(s: &str) -> Self {391 NixExprBuilder {392 out: nixlike::serialize(s)393 .expect("no problems with serializing_string")394 .trim_end()395 .to_owned(),396 used_fields: Vec::new(),397 }398 }399 pub fn serialized(v: impl Serialize) -> Self {400 let serialized = nixlike::serialize(v).expect("invalid value for apply");401 Self {402 out: serialized.trim_end().to_owned(),403 used_fields: Vec::new(),404 }405 }406 pub fn field(f: Field) -> Self {407 Self {408 out: format!("sess_field_{}", f.0.value.expect("no value")),409 used_fields: vec![f],410 }411 }412 pub fn end_obj(&mut self) {413 self.out.push('}');414 }415 pub fn obj_key(&mut self, name: Self, value: Self) {416 self.out.push_str(r#""${"#);417 self.extend(name);418 self.out.push_str(r#"}" = "#);419 self.extend(value);420 self.out.push_str("; ");421 }422423 pub fn extend(&mut self, e: Self) {424 self.out.push_str(&e.out);425 self.used_fields.extend(e.used_fields);426 }427428 pub fn session(&self) -> NixSession {429 let mut session = None;430 for ele in &self.used_fields {431 if session.is_none() {432 session = Some(ele.0.session.clone());433 continue;434 }435 let session = &session.as_ref().expect("checked").0;436 let ele_sess = &ele.0.session.0;437 assert!(438 Arc::ptr_eq(session, ele_sess),439 "can't mix fields from different session"440 );441 }442 session.expect("expr without fields used")443 }444 pub fn index_attr(&mut self, s: &str) {445 let escaped = nixlike::serialize(s).expect("string");446 self.out.push('.');447 self.out.push_str(escaped.trim_end());448 }449}450451#[macro_export]452macro_rules! nix_expr_inner {453 (Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{454 use $crate::better_nix_eval::NixExprBuilder;455 let mut out = NixExprBuilder::object();456 $(457 out.obj_key(458 NixExprBuilder::string(stringify!($ident)),459 $crate::nix_expr_inner!($($val)+),460 );461 )*462 out.end_obj();463 out464 }};465 (@field($o:ident) . $var:ident $($tt:tt)*) => {{466 $o.index_attr(stringify!($var));467 nix_expr_inner!(@field($o) $($tt)*);468 }};469 (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{470 $o.push(Index::attr(&$v));471 nix_expr_inner!(@o($o) $($tt)*);472 }};473 (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{474 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));475 nix_expr_inner!(@o($o) $($tt)*);476 }};477 (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {478 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));479 nix_expr_inner!(@o($o) $($tt)*);480 };481 (@field($o:ident)) => {};482 ($field:ident $($tt:tt)*) => {{483 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};484 #[allow(unused_mut, reason = "might be used if indexed")]485 let mut out = NixExprBuilder::field($field.clone());486 nix_expr_inner!(@field(out) $($tt)*);487 out488 }};489 ($v:literal) => {{490 use $crate::better_nix_eval::NixExprBuilder;491 NixExprBuilder::string($v)492 }};493 ({$v:expr}) => {{494 use $crate::better_nix_eval::NixExprBuilder;495 NixExprBuilder::serialized(&$v)496 }}497}498#[macro_export]499macro_rules! nix_expr {500 ($($tt:tt)+) => {{501 use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};502 let expr = nix_expr_inner!($($tt)+);503 Field::new(expr.session(), expr.out)504 }};505}506507#[macro_export]508macro_rules! nix_go {509 (@o($o:ident) . $var:ident $($tt:tt)*) => {{510 $o.push(Index::attr(stringify!($var)));511 nix_go!(@o($o) $($tt)*);512 }};513 (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{514 $o.push(Index::attr(&$v));515 nix_go!(@o($o) $($tt)*);516 }};517 (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{518 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));519 nix_go!(@o($o) $($tt)*);520 }};521 (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {522 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));523 nix_go!(@o($o) $($tt)*);524 };525 (@o($o:ident)) => {};526 ($field:ident $($tt:tt)+) => {{527 use $crate::{nix_go, better_nix_eval::Index};528 let field = $field.clone();529 let mut out = vec![];530 nix_go!(@o(out) $($tt)*);531 field.select(out).await?532 }}533}534#[macro_export]535macro_rules! nix_go_json {536 ($($tt:tt)*) => {{537 $crate::nix_go!($($tt)*).as_json().await?538 }};539}540541#[derive(Clone)]542pub enum Index {543 Var(String),544 String(String),545 Apply(String),546 Expr(NixExprBuilder),547 ExprApply(NixExprBuilder),548}549impl Index {550 pub fn var(v: impl AsRef<str>) -> Self {551 let v = v.as_ref();552 assert!(553 !(v.contains('.') | v.contains(' ')),554 "bad variable name: {v}"555 );556 Self::Var(v.to_owned())557 }558 pub fn attr(v: impl AsRef<str>) -> Self {559 Self::String(v.as_ref().to_owned())560 }561 pub fn apply(v: impl Serialize) -> Self {562 let serialized = nixlike::serialize(v).expect("invalid value for apply");563 Self::Apply(serialized.trim_end().to_owned())564 }565}566impl Display for Index {567 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {568 match self {569 Index::Var(v) => {570 write!(f, "{v}")571 }572 Index::String(k) => {573 let v = nixlike::format_identifier(k.as_str());574 write!(f, ".{v}")575 }576 Index::Apply(o) => {577 write!(f, "<apply>({o})")578 }579 Index::Expr(e) => {580 write!(f, "[{}]", e.out)581 }582 Index::ExprApply(e) => {583 write!(f, "<apply>({})", e.out)584 }585 }586 }587}588impl fmt::Debug for Index {589 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {590 write!(f, "{self}")591 }592}593struct PathDisplay<'i>(&'i [Index]);594impl Display for PathDisplay<'_> {595 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {596 for i in self.0 {597 write!(f, "{i}")?;598 }599 Ok(())600 }601}602struct FieldInner {603 full_path: Option<Vec<Index>>,604 session: NixSession,605 value: Option<u32>,606}607fn context(full_path: Option<&[Index]>, query: &str) -> String {608 if let Some(full_path) = &full_path {609 format!("full path: {}", PathDisplay(full_path))610 } else {611 format!("query: {query:?}")612 }613}614#[derive(Clone)]615pub struct Field(Arc<FieldInner>);616impl Field {617 fn root(session: NixSession) -> Self {618 Self(Arc::new(FieldInner {619 full_path: Some(vec![]),620 session,621 value: None,622 }))623 }624 async fn new(session: NixSession, query: &str) -> Result<Self> {625 let vid = session626 .0627 .lock()628 .await629 .execute_assign(query)630 .await631 .with_context(|| context(None, query))?;632 Ok(Self(Arc::new(FieldInner {633 full_path: None,634 session,635 value: Some(vid),636 })))637 }638 pub async fn field(session: NixSession, field: &str) -> Result<Self> {639 Self::root(session).select([Index::var(field)]).await640 }641 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {642 let mut used_fields = Vec::new();643 let mut name = name.into_iter();644645 let mut full_path = self.0.full_path.clone();646 let mut query = if let Some(id) = self.0.value {647 format!("sess_field_{id}")648 } else {649 let first = name.next();650 if let Some(Index::Var(i)) = first {651 if let Some(full_path) = &mut full_path {652 full_path.push(Index::Var(i.clone()));653 }654 i.clone()655 } else {656 panic!("first path item should be variable, got {first:?}")657 }658 };659 for v in name {660 if let Some(full_path) = &mut full_path {661 full_path.push(v.clone());662 }663 match v {664 Index::Var(_) => panic!("var item may only be first"),665 Index::String(s) => {666 let escaped = nixlike::serialize(s)?;667 query.push('.');668 query.push_str(escaped.trim());669 }670 Index::Apply(a) => {671 // In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`672 query = format!("({query} {a})");673 }674 Index::Expr(e) => {675 let index = Field::new(self.0.session.clone(), &e.out).await?;676 used_fields.push(index.clone());677 query.push('.');678 let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));679 query.push_str(&index);680 }681 Index::ExprApply(e) => {682 let index = Field::new(self.0.session.clone(), &e.out).await?;683 used_fields.push(index.clone());684 query.push(' ');685 let index = format!("sess_field_{}", index.0.value.expect("value"));686 query.push_str(&index);687 query = format!("({query})");688 }689 }690 }691692 let vid = self693 .0694 .session695 .0696 .lock()697 .await698 .execute_assign(&query)699 .await700 .with_context(|| {701 if let Some(full_path) = &full_path {702 format!("full path: {}", PathDisplay(full_path))703 } else {704 format!("query: {query:?}")705 }706 })?;707 Ok(Self(Arc::new(FieldInner {708 full_path,709 session: self.0.session.clone(),710 value: Some(vid),711 })))712 }713 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {714 let id = self.0.value.expect("can't serialize root field");715 let query = format!("sess_field_{id}");716 self.0717 .session718 .0719 .lock()720 .await721 .execute_expression_to_json(&query)722 .await723 .with_context(|| context(self.0.full_path.as_deref(), &query))724 }725 pub async fn has_field(&self, name: &str) -> Result<bool> {726 let id = self.0.value.expect("can't list root fields");727 let key = nixlike::escape_string(name);728 let query = format!("sess_field_{id} ? {key}");729 self.0730 .session731 .0732 .lock()733 .await734 .execute_expression_to_json(&query)735 .await736 .with_context(|| context(self.0.full_path.as_deref(), &query))737 }738 pub async fn list_fields(&self) -> Result<Vec<String>> {739 let id = self.0.value.expect("can't list root fields");740 let query = format!("builtins.attrNames sess_field_{id}");741 self.0742 .session743 .0744 .lock()745 .await746 .execute_expression_to_json(&query)747 .await748 .with_context(|| context(self.0.full_path.as_deref(), &query))749 }750 pub async fn type_of(&self) -> Result<String> {751 let id = self.0.value.expect("can't list root fields");752 let query = format!("builtins.typeOf sess_field_{id}");753 self.0754 .session755 .0756 .lock()757 .await758 .execute_expression_to_json(&query)759 .await760 .with_context(|| context(self.0.full_path.as_deref(), &query))761 }762 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {763 let id = self.0.value.expect("can't use build on not-value");764 let query = format!(":b sess_field_{id}");765 let vid = self766 .0767 .session768 .0769 .lock()770 .await771 .execute_expression_raw(&query, &mut NixHandler::default())772 .await?;773 ensure!(774 !vid.is_empty(),775 "build failed: {}",776 context(self.0.full_path.as_deref(), &query),777 );778 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")779 else {780 panic!("unexpected build output: {vid:?}");781 };782 let outputs = vid783 .split('\n')784 .filter(|v| !v.is_empty())785 .map(|v| v.split_once(" -> ").expect("unexpected build output"))786 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))787 .collect();788 Ok(outputs)789 }790}791impl Drop for FieldInner {792 fn drop(&mut self) {793 if let Some(id) = self.value {794 if let Ok(mut lock) = self.session.0.try_lock() {795 lock.free_list.push(id)796 }797 // Leaked798 }799 }800}801struct NixSessionPoolInner {802 flake: OsString,803 nix_args: Vec<OsString>,804}805806#[derive(Debug)]807pub struct NixPoolError(anyhow::Error);808impl From<anyhow::Error> for NixPoolError {809 fn from(value: anyhow::Error) -> Self {810 Self(value)811 }812}813impl std::error::Error for NixPoolError {}814impl std::fmt::Display for NixPoolError {815 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {816 self.0.fmt(f)817 }818}819impl r2d2::ManageConnection for NixSessionPoolInner {820 type Connection = NixSessionInner;821 type Error = NixPoolError;822 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {823 let _v = TOKIO_RUNTIME824 .get()825 .expect("missed tokio runtime init!")826 .enter();827 Ok(futures::executor::block_on(NixSessionInner::new(828 self.flake.as_os_str(),829 self.nix_args.iter().map(OsString::as_os_str),830 ))?)831 }832833 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {834 let _v = TOKIO_RUNTIME835 .get()836 .expect("missed tokio runtime init!")837 .enter();838 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;839 if res != 4 {840 return Err(anyhow!("sanity check failed").into());841 };842 Ok(())843 }844845 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {846 false847 }848}849pub struct NixSessionPool(Pool<NixSessionPoolInner>);850impl NixSessionPool {851 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {852 let inner = tokio::task::block_in_place(|| {853 r2d2::Builder::<NixSessionPoolInner>::new()854 .min_idle(Some(0))855 .build(NixSessionPoolInner { flake, nix_args })856 })?;857 Ok(Self(inner))858 }859 pub async fn get(&self) -> Result<NixSession> {860 let v = tokio::task::block_in_place(|| self.0.get())?;861 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))862 }863}864865pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
use crate::nix_go;
use anyhow::{anyhow, Result};
use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -112,12 +112,12 @@
current: bool,
datetime: String,
}
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
- let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.arg("--list-generations");
// Sudo is required due to --list-generations acquiring lock on the profile.
- let data = config.run_string_on(host, cmd, true).await?;
+ let data = cmd.sudo().run_string().await?;
let generations = data
.split('\n')
.map(|e| e.trim())
@@ -161,25 +161,12 @@
.map_err(|_e| anyhow!("bad list-generations output"))?
.ok_or_else(|| anyhow!("failed to find generation"))?;
Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("stop").arg(unit);
- config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("start").arg(unit);
- config.run_on(host, cmd, true).await
}
async fn execute_upload(
build: &BuildSystems,
- config: &Config,
action: UploadAction,
- host: &str,
+ host: &ConfigHost,
built: PathBuf,
) -> Result<()> {
let mut failed = false;
@@ -191,15 +178,15 @@
if !build.disable_rollback {
let _span = info_span!("preparing").entered();
info!("preparing for rollback");
- let generation = get_current_generation(config, host).await?;
+ let generation = get_current_generation(&host).await?;
info!(
"rollback target would be {} {}",
generation.id, generation.datetime
);
{
- let mut cmd = MyCommand::new("sh");
+ let mut cmd = host.cmd("sh").await?;
cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to set rollback marker: {e}");
failed = true;
}
@@ -215,37 +202,41 @@
// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
// Anyway, reboot will still help in this case.
if action.should_schedule_rollback_run() {
- let mut cmd = MyCommand::new("systemd-run");
+ let mut cmd = host.cmd("systemd-run").await?;
cmd.comparg("--on-active", "3min")
.comparg("--unit", "rollback-watchdog-run")
.arg("systemctl")
.arg("start")
.arg("rollback-watchdog.service");
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to schedule rollback run: {e}");
failed = true;
}
}
}
+
if action.should_switch_profile() && !failed {
info!("switching generation");
- let mut cmd = MyCommand::new("nix-env");
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to switch generation: {e}");
failed = true;
}
}
+
+ // FIXME: Connection might be disconnected after activation run
+
if action.should_activate() && !failed {
let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
- let mut cmd = MyCommand::new(switch_script);
+ let mut cmd = host.cmd(switch_script).await?;
cmd.arg(action.name());
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = cmd.sudo().run().in_current_span().await {
error!("failed to activate: {e}");
failed = true;
}
@@ -253,7 +244,8 @@
if !build.disable_rollback {
if failed {
info!("executing rollback");
- if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+ if let Err(e) = host
+ .systemctl_start("rollback-watchdog.service")
.instrument(info_span!("rollback"))
.await
{
@@ -261,27 +253,29 @@
}
} else {
info!("trying to mark upgrade as successful");
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
}
}
info!("disarming watchdog, just in case");
- if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+ if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
// It is ok, if there was no reboot - then timer might not be running.
}
if action.should_schedule_rollback_run() {
- if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+ if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
error!("failed to disarm rollback run: {e}");
}
}
- } else {
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
- // Marker might not exist, yet better try to remove it.
- }
+ } else if let Err(_e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
+ // Marker might not exist, yet better try to remove it.
}
Ok(())
}
@@ -289,12 +283,13 @@
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
+ let host = config.host(&host).await?;
let action = Action::from(self.subcommand.clone());
let fleet_field = &config.fleet_field;
let drv = nix_go!(
fleet_field.buildSystems(Obj {
localSystem: { config.local_system.clone() }
- })[{ action.build_attr() }][{ host }]
+ })[{ action.build_attr() }][{ &host.name }]
);
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
match action {
Action::Upload { action } => {
- if !config.is_local(&host) {
+ if !config.is_local(&host.name) {
info!("uploading system closure");
{
+ // TODO: Move to remote_derivation method.
// Alternatively, nix store make-content-addressed can be used,
// at least for the first deployment, to provide trusted store key.
//
@@ -329,13 +325,11 @@
}
let mut tries = 0;
loop {
- let mut nix = MyCommand::new("nix");
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{host}"))
- .arg(out_output);
- match nix.run_nix().await {
- Ok(()) => break,
+ match host.remote_derivation(out_output).await {
+ Ok(remote) => {
+ assert!(&remote == out_output, "CA derivations aren't implemented");
+ break;
+ }
Err(e) if tries < 3 => {
tries += 1;
warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, out_output.clone()).await?
+ execute_upload(&self, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
- out.push(format!("sd-image-{}", host));
+ out.push(format!("sd-image-{}", host.name));
info!("linking sd image to {:?}", out);
symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
- out.push(format!("installation-cd-{}", host));
+ out.push(format!("installation-cd-{}", host.name));
info!("linking iso image to {:?}", out);
symlink(out_output, out)?;
@@ -379,6 +373,17 @@
let this = this.clone();
let span = info_span!("deployment", host = field::display(&host.name));
let hostname = host.name;
+ // FIXME: Since the introduction of better-nix-eval,
+ // due to single repl used for builds, hosts are waiting for each other to build,
+ // instead of building concurrently.
+ //
+ // Open multiple repls?
+ //
+ // Create build batcher, which will behave similar to golangs
+ // WaitGroup, and start executing once all the build tasks are scheduled?
+ // This also allows to cleanup build output, as there will be no longer
+ // "waiting for remote machine" messages in the cases when one package is needed for
+ // multiple hosts.
set.spawn_local(
(async move {
match this.build_task(config, hostname).await {
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
@@ -404,9 +404,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(data) = secret.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, data, target_recipients)
- .await?;
+ let host = config.host(&identity_holder).await?;
+ let encrypted = host.reencrypt(data, target_recipients).await?;
secret.secret.secret = Some(encrypted);
}
@@ -481,9 +480,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(secret) = data.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret, target_recipients)
- .await?;
+ let host = config.host(identity_holder).await?;
+ let encrypted = host.reencrypt(secret, target_recipients).await?;
data.secret.secret = Some(encrypted);
}
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
- collections::HashMap,
- ffi::OsStr,
- pin,
- process::Stdio,
- sync::{Arc, Mutex},
- task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
use futures::StreamExt;
use itertools::Either;
-use once_cell::sync::Lazy;
use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
use tokio::{io::AsyncRead, process::Command, select};
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
fn escape_bash(input: &str, out: &mut String) {
const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
return self;
}
let mut out = Self::new("env");
- if let Some(session) = self.ssh_session {
- out = out.ssh_session(session);
- }
+ out.ssh_session = self.ssh_session;
for (k, v) in self.env {
assert!(!k.contains('='));
out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
out
} else {
let mut out = Self::new("sudo");
+ out.ssh_session = self.ssh_session.take();
out.args(self.into_args());
out
}
}
- pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
- self.ssh_session = Some(on);
- self
- }
- pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
- let mut out = Self::new("ssh");
- out.ssh_session = self.ssh_session.take();
- out.arg(on).arg("--");
- out.arg(self.into_string());
- out
- }
pub async fn run(self) -> Result<()> {
let str = self.clone().into_string();
@@ -276,259 +256,6 @@
let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
assert!(v.is_none());
Ok(())
-}
-
-pub trait Handler: Send {
- fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
- fn clone(&self) -> Self {
- Self(self.0.clone())
- }
-}
-impl<H> ClonableHandler<H> {
- pub fn new(inner: H) -> Self {
- Self(Arc::new(Mutex::new(inner)))
- }
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
- fn handle_line(&mut self, e: &str) {
- self.0.lock().unwrap().handle_line(e)
- }
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
- fn handle_line(&mut self, e: &str) {
- info!(target: "log", "{e}");
- }
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
- fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
- spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
- static OSC_CLEANER: Lazy<Regex> =
- Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
- static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
- let m = OSC_CLEANER.replace_all(m, "");
- // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
- // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
- DETABBER.replace_all(m.as_ref(), " ").to_string()
-}
-impl Handler for NixHandler {
- fn handle_line(&mut self, e: &str) {
- if let Some(e) = e.strip_prefix("@nix ") {
- let log: NixLog = match serde_json::from_str(e) {
- Ok(l) => l,
- Err(err) => {
- warn!("failed to parse nix log line {:?}: {}", e, err);
- return;
- }
- };
- match log {
- NixLog::Msg { msg, raw_msg, .. } => {
- #[allow(clippy::nonminimal_bool)]
- if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
- && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
- && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
- if let Some(raw_msg) = raw_msg {
- if !msg.is_empty() {
- info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
- } else {
- info!(target: "nix", "{}", raw_msg.trim_end())
- }
- } else {
- info!(target: "nix", "{}", msg.trim_end())
- }
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 105 && !fields.is_empty() => {
- if let [LogField::String(drv), ..] = &fields[..] {
- let mut drv = drv.as_str();
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- info!(target: "nix","building {}", drv);
- let span = info_span!("build", drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad build log: {:?}", log)
- }
- }
- NixLog::Start {
- ref fields,
- typ,
- id,
- ..
- } if typ == 100 && fields.len() >= 3 => {
- if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
- &fields[..]
- {
- let mut drv = drv.as_str();
-
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- // info!(target: "nix","copying {} {} -> {}", drv, from, to);
- let span = info_span!("copy", from, to, drv);
- span.pb_start();
- self.spans.insert(id, span);
- } else {
- warn!("bad copy log: {:?}", log)
- }
- }
- NixLog::Start { text, typ, id, .. }
- if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
- {
- if !text.is_empty()
- && text != "querying info about missing paths"
- && text != "copying 0 paths"
- // Too much spam on lazy-trees branch
- && !(text.starts_with("copying '") && text.ends_with("' to the store"))
- {
- let span = info_span!("job");
- span.pb_start();
- span.pb_set_message(&process_message(text.trim()));
- self.spans.insert(id, span);
- info!(target: "nix", "{}", text);
- }
- }
- NixLog::Start {
- text,
- level: 0,
- typ: 108,
- ..
- } if text.is_empty() => {
- // Cache lookup? Coupled with copy log
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 109,
- ..
- } if text.starts_with("querying info about ") => {
- // Cache lookup
- }
- NixLog::Start {
- text,
- level: 4,
- typ: 101,
- ..
- } if text.starts_with("downloading ") => {
- // NAR downloading, coupled with copy log
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- ..
- } if text.starts_with("waiting for a machine to build ") => {
- // Useless repeating notification about build
- }
- NixLog::Start {
- text,
- level: 3,
- typ: 111,
- ..
- } if text.starts_with("resolved derivation: ") => {
- // CA resolved
- }
- NixLog::Start {
- text,
- level: 1,
- typ: 111,
- id,
- ..
- } if text.starts_with("waiting for lock on ") => {
- let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
- if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
- drv = txt;
- }
- if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
- drv = txt;
- }
- if let Some(txt) = drv.split("', '").next() {
- drv = txt;
- }
- if let Some(pkg) = drv.strip_prefix("/nix/store/") {
- let mut it = pkg.splitn(2, '-');
- it.next();
- if let Some(pkg) = it.next() {
- drv = pkg;
- }
- }
- let span = info_span!("waiting on drv", drv);
- span.pb_start();
- self.spans.insert(id, span);
- // Concurrent build of the same message
- }
- NixLog::Stop { id, .. } => {
- self.spans.remove(&id);
- }
- NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
- if let Some(span) = self.spans.get(&id) {
- if let LogField::String(s) = &fields[0] {
- span.pb_set_message(&process_message(s.trim()));
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- warn!("unknown result id: {id} {typ} {fields:?}");
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
- if let Some(span) = self.spans.get(&id) {
- if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
- &fields[..4]
- {
- span.pb_set_length(*expected);
- span.pb_set_position(*done);
- } else {
- warn!("bad fields: {fields:?}");
- }
- } else {
- // warn!("unknown result id: {id} {typ} {fields:?}");
- // Unaccounted progress.
- }
- // dbg!(fields, id, typ);
- }
- NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
- // Set phase, expected
- }
- _ => warn!("unknown log: {:?}", log),
- };
- } else {
- let e = e.trim();
- if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
- return;
- }
- info!("{e}")
- }
- }
}
async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
) -> Result<Option<Vec<u8>>> {
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());
+ debug!("running command {cmd:?} on local");
let mut child = cmd.spawn()?;
let mut stderr = child.stderr.take().unwrap();
let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
err_handler: &mut dyn Handler,
mut out_handler: Option<&mut dyn Handler>,
) -> Result<Option<Vec<u8>>> {
+ debug!("running command {cmd:?} over ssh");
cmd.stderr(openssh::Stdio::piped());
cmd.stdout(openssh::Stdio::piped());
let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
}
Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
- /// Return true to discard message from logging
- fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
- String(String),
- Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
- fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- struct StringOrNum;
- impl<'de> Visitor<'de> for StringOrNum {
- type Value = LogField;
-
- fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "string or unsigned")
- }
-
- fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::String(v.to_owned()))
- }
-
- fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- Ok(LogField::Num(v))
- }
- }
-
- deserializer.deserialize_any(StringOrNum)
- }
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
- Msg {
- level: u32,
- msg: String,
- raw_msg: Option<String>,
- },
- Start {
- id: u64,
- level: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- text: String,
- #[serde(rename = "type")]
- typ: u32,
- },
- Stop {
- id: u64,
- },
- Result {
- id: u64,
- #[serde(rename = "type")]
- typ: u32,
- #[serde(default)]
- fields: Vec<LogField>,
- },
}
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
sync::{Arc, Mutex, MutexGuard, OnceLock},
};
-use age::Recipient;
use anyhow::{anyhow, bail, Context, Result};
use clap::{ArgGroup, Parser};
use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
pub struct ConfigHost {
pub name: String,
+ pub local: bool,
pub session: OnceLock<Arc<openssh::Session>>,
}
impl ConfigHost {
- pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ assert!(!self.local, "do not open ssh connection to local session");
// FIXME: TOCTOU
if let Some(session) = &self.session.get() {
return Ok((*session).clone());
@@ -96,8 +97,12 @@
D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
}
pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
- let session = self.open_session().await?;
- Ok(MyCommand::new_on(cmd, session))
+ if self.local {
+ Ok(MyCommand::new(cmd))
+ } else {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(cmd, session))
+ }
}
pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
.context("failed to call remote host for decrypt")?;
z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
}
+ pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+ for target in targets {
+ cmd.eqarg("--targets", target);
+ }
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+ }
/// Returns path for futureproofing, as path might change i.e on conversion to CA
pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+ if self.local {
+ // Path is located locally, thus already trusted.
+ return Ok(path.to_owned());
+ }
let mut nix = MyCommand::new("nix");
nix.arg("copy")
.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
nix.run_nix().await?;
Ok(path.to_owned())
}
+ pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("stop").arg(name);
+ cmd.sudo().run().await
+ }
+ pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("start").arg(name);
+ cmd.sudo().run().await
+ }
+
+ pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+ let mut cmd = self.cmd("rm").await?;
+ cmd.arg("-f").arg(path);
+ if sudo {
+ cmd = cmd.sudo()
+ }
+ cmd.run().await
+ }
}
impl Config {
@@ -134,35 +175,12 @@
}
pub fn is_local(&self, host: &str) -> bool {
self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
- }
-
- pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run().await
- }
- pub async fn run_string_on(
- &self,
- host: &str,
- mut command: MyCommand,
- sudo: bool,
- ) -> Result<String> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run_string().await
}
pub async fn host(&self, name: &str) -> Result<ConfigHost> {
Ok(ConfigHost {
name: name.to_owned(),
+ local: self.is_local(name),
session: OnceLock::new(),
})
}
@@ -172,6 +190,7 @@
let mut out = vec![];
for name in names {
out.push(ConfigHost {
+ local: self.is_local(&name),
name,
session: OnceLock::new(),
})
@@ -225,27 +244,6 @@
let mut data = self.data_mut();
let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
host_secrets.insert(secret, value);
- }
-
- pub async fn reencrypt_on_host(
- &self,
- host: &str,
- data: SecretData,
- targets: Vec<String>,
- ) -> Result<SecretData> {
- let mut recmd = MyCommand::new("fleet-install-secrets");
- recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
- for target in targets {
- recmd.eqarg("--targets", target);
- }
- recmd = recmd.sudo().ssh(host);
- let encoded = recmd
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?
- .trim()
- .to_owned();
- SecretData::decode_z85(&encoded)
}
pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
use std::str::FromStr;
-use crate::command::MyCommand;
use crate::host::Config;
use age::Recipient;
use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let mut cmd = MyCommand::new("cat");
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = self.run_string_on(host, cmd, false).await?;
- self.update_key(host, key.clone());
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
Ok(key)
}
}
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
cmds/remowt-agent/README.adocdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}
crates/better-command/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
crates/better-command/src/handler.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+ fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+impl<H> ClonableHandler<H> {
+ pub fn new(inner: H) -> Self {
+ Self(Arc::new(Mutex::new(inner)))
+ }
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+ fn handle_line(&mut self, e: &str) {
+ self.0.lock().unwrap().handle_line(e)
+ }
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+ fn handle_line(&mut self, e: &str) {
+ info!(target: "log", "{e}");
+ }
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+ fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+ spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+ String(String),
+ Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+ Msg {
+ level: u32,
+ msg: String,
+ raw_msg: Option<String>,
+ },
+ Start {
+ id: u64,
+ level: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ text: String,
+ #[serde(rename = "type")]
+ typ: u32,
+ },
+ Stop {
+ id: u64,
+ },
+ Result {
+ id: u64,
+ #[serde(rename = "type")]
+ typ: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ },
+}
+fn process_message(m: &str) -> String {
+ // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+ static OSC_CLEANER: Lazy<Regex> =
+ Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
+}
+impl Handler for NixHandler {
+ fn handle_line(&mut self, e: &str) {
+ if let Some(e) = e.strip_prefix("@nix ") {
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ return;
+ }
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ #[allow(clippy::nonminimal_bool)]
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ if !msg.is_empty() {
+ info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+ } else {
+ info!(target: "nix", "{}", raw_msg.trim_end())
+ }
+ } else {
+ info!(target: "nix", "{}", msg.trim_end())
+ }
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ let mut drv = drv.as_str();
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ info!(target: "nix","building {}", drv);
+ let span = info_span!("build", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+ &fields[..]
+ {
+ let mut drv = drv.as_str();
+
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ let span = info_span!("copy", from, to, drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ }
+ NixLog::Start { text, typ, id, .. }
+ if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+ {
+ if !text.is_empty()
+ && text != "querying info about missing paths"
+ && text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
+ {
+ let span = info_span!("job");
+ span.pb_start();
+ span.pb_set_message(&process_message(text.trim()));
+ self.spans.insert(id, span);
+ info!(target: "nix", "{}", text);
+ }
+ }
+ NixLog::Start {
+ text,
+ level: 0,
+ typ: 108,
+ ..
+ } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 109,
+ ..
+ } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 101,
+ ..
+ } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ ..
+ } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start {
+ text,
+ level: 3,
+ typ: 111,
+ ..
+ } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ id,
+ ..
+ } if text.starts_with("waiting for lock on ") => {
+ let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+ if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.split("', '").next() {
+ drv = txt;
+ }
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ let span = info_span!("waiting on drv", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ // Concurrent build of the same message
+ }
+ NixLog::Stop { id, .. } => {
+ self.spans.remove(&id);
+ }
+ NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+ if let Some(span) = self.spans.get(&id) {
+ if let LogField::String(s) = &fields[0] {
+ span.pb_set_message(&process_message(s.trim()));
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ warn!("unknown result id: {id} {typ} {fields:?}");
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+ if let Some(span) = self.spans.get(&id) {
+ if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+ &fields[..4]
+ {
+ span.pb_set_length(*expected);
+ span.pb_set_position(*done);
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ // warn!("unknown result id: {id} {typ} {fields:?}");
+ // Unaccounted progress.
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+ // Set phase, expected
+ }
+ _ => warn!("unknown log: {:?}", log),
+ };
+ } else {
+ let e = e.trim();
+ if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+ return;
+ }
+ info!("{e}")
+ }
+ }
+}
crates/better-command/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn it_works() {
+ let result = add(2, 2);
+ assert_eq!(result, 4);
+ }
+}