From 966e7f167d68305e8e8c7704b8b73f4adc079d37 Mon Sep 17 00:00:00 2001 From: Yaroslav Bolyukin Date: Sat, 25 May 2024 22:49:23 +0000 Subject: [PATCH] refactor: c bindings --- --- a/Cargo.lock +++ b/Cargo.lock @@ -198,9 +198,9 @@ [[package]] name = "anyhow" -version = "1.0.83" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3" +checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" [[package]] name = "arc-swap" @@ -222,7 +222,7 @@ dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] [[package]] @@ -374,9 +374,9 @@ [[package]] name = "cc" -version = "1.0.97" +version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "099a5357d84c4c61eb35fc8eafa9a79a902c2f76911e5747ced4e032edd8d9b4" +checksum = "41c270e7540d725e65ac7f1b212ac8ce349719624d7bcff99f8e2e488e8cf03f" [[package]] name = "cfg-if" @@ -468,7 +468,7 @@ "heck", "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] [[package]] @@ -587,6 +587,7 @@ "cfg-if", "cpufeatures", "curve25519-dalek-derive", + "digest", "fiat-crypto", "platforms", "rustc_version", @@ -602,7 +603,7 @@ dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] [[package]] @@ -658,14 +659,39 @@ dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", +] + +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "pkcs8", + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" +dependencies = [ + "curve25519-dalek", + "ed25519", + "rand_core", + "serde", + "sha2", + "subtle", + "zeroize", ] [[package]] name = "either" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" +checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" [[package]] name = "encode_unicode" @@ -894,7 +920,7 @@ dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] [[package]] @@ -928,6 +954,18 @@ ] [[package]] +name = "generator-helper" +version = "0.1.0" +dependencies = [ + "age", + "anyhow", + "clap", + "ed25519-dalek", + "fleet-shared", + "rand", +] + +[[package]] name = "generic-array" version = "0.14.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1033,7 +1071,7 @@ "serde", "serde_derive", "thiserror", - "toml 0.8.12", + "toml 0.8.13", "unic-langid", ] @@ -1075,7 +1113,7 @@ "proc-macro2", "quote", "strsim", - "syn 2.0.63", + "syn 2.0.66", "unic-langid", ] @@ -1089,7 +1127,7 @@ "i18n-config", "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] [[package]] @@ -1151,9 +1189,9 @@ [[package]] name = "instant" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" dependencies = [ "cfg-if", ] @@ -1241,9 +1279,9 @@ [[package]] name = "libc" -version = "0.2.154" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae743338b92ff9146ce83992f766a31066a91a8c84a45e0e9f21e7cf6de6d346" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "libm" @@ -1253,9 +1291,9 @@ [[package]] name = "libmimalloc-sys" -version = "0.1.37" +version = "0.1.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81eb4061c0582dedea1cbc7aff2240300dd6982e0239d1c99e65c1dbf4a30ba7" +checksum = "0e7bb23d733dfcc8af652a78b7bf232f0e967710d044732185e561e47c0336b6" dependencies = [ "cc", "libc", @@ -1269,9 +1307,9 @@ [[package]] name = "linux-raw-sys" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "lock_api" @@ -1321,9 +1359,9 @@ [[package]] name = "mimalloc" -version = "0.1.41" +version = "0.1.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f41a2280ded0da56c8cf898babb86e8f10651a34adcfff190ae9a1159c6908d" +checksum = "e9186d86b79b52f4a77af65604b51225e8db1d6ee7e3f41aec1e40829c71a176" dependencies = [ "libmimalloc-sys", ] @@ -1336,9 +1374,9 @@ [[package]] name = "miniz_oxide" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" +checksum = "87dfd01fe195c66b572b37921ad8803d010623c0aca821bea2302239d155cdae" dependencies = [ "adler", ] @@ -1533,9 +1571,9 @@ [[package]] name = "parking_lot" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", "parking_lot_core", @@ -1608,7 +1646,7 @@ dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] [[package]] @@ -1717,9 +1755,9 @@ [[package]] name = "proc-macro2" -version = "1.0.82" +version = "1.0.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ad3d49ab951a01fbaafe34f2ec74122942fe18a3f9814c3268f1bb72042131b" +checksum = "ec96c6a92621310b51366f1e28d05ef11489516e93be030060e5fc12024a49d6" dependencies = [ "unicode-ident", ] @@ -1911,7 +1949,7 @@ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.63", + "syn 2.0.66", "walkdir", ] @@ -2041,9 +2079,9 @@ [[package]] name = "serde" -version = "1.0.202" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395" +checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" dependencies = [ "serde_derive", ] @@ -2059,13 +2097,13 @@ [[package]] name = "serde_derive" -version = "1.0.202" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838" +checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] [[package]] @@ -2081,9 +2119,9 @@ [[package]] name = "serde_spanned" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" +checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0" dependencies = [ "serde", ] @@ -2245,9 +2283,9 @@ [[package]] name = "syn" -version = "2.0.63" +version = "2.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf5be731623ca1a1fb7d8be6f261a3be6d3e2337b8a1f97be944d020c8fcb704" +checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" dependencies = [ "proc-macro2", "quote", @@ -2308,22 +2346,22 @@ [[package]] name = "thiserror" -version = "1.0.60" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "579e9083ca58dd9dcf91a9923bb9054071b9ebbd800b342194c9feb0ee89fc18" +checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.60" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2470041c06ec3ac1ab38d0356a6119054dedaea53e12fbefc0de730a1c08524" +checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] [[package]] @@ -2401,7 +2439,7 @@ dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] [[package]] @@ -2438,9 +2476,9 @@ [[package]] name = "toml" -version = "0.8.12" +version = "0.8.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3" +checksum = "a4e43f8cc456c9704c851ae29c67e17ef65d2c30017c17a9765b89c382dc8bba" dependencies = [ "serde", "serde_spanned", @@ -2450,18 +2488,18 @@ [[package]] name = "toml_datetime" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" dependencies = [ "serde", ] [[package]] name = "toml_edit" -version = "0.22.12" +version = "0.22.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3328d4f68a705b2a4498da1d580585d39a6510f98318a2cec3018a7ec61ddef" +checksum = "c127785850e8c20836d49732ae6abfa47616e60bf9d9f57c43c250361a9db96c" dependencies = [ "indexmap", "serde", @@ -2489,7 +2527,7 @@ dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] [[package]] @@ -2708,7 +2746,7 @@ "once_cell", "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", "wasm-bindgen-shared", ] @@ -2730,7 +2768,7 @@ dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2964,5 +3002,5 @@ dependencies = [ "proc-macro2", "quote", - "syn 2.0.63", + "syn 2.0.66", ] --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,27 @@ [workspace.dependencies] nixlike = { path = "./crates/nixlike" } better-command = { path = "./crates/better-command" } -bifrostlink = "0.1.0" -uuid = { version = "1.7.0", features = ["v4"] } -tokio = { version = "1.36.0", features = ["fs", "rt", "macros", "sync", "time", "rt-multi-thread"] } fleet-shared = { path = "./crates/fleet-shared" } +tokio = { version = "1.36.0", features = [ + "fs", + "rt", + "macros", + "sync", + "time", + "rt-multi-thread", +] } +# Using fixed version for rust on stable nixos branches. +clap = { version = ">=4.4, <4.5", features = [ + "derive", + "env", + "wrap_help", + "unicode", +] } +age = { version = "0.10", features = ["ssh"] } +anyhow = "1.0" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tempfile = "3.10" +nix = {version = "0.27.1", features = ["user", "fs"]} --- a/cmds/fleet/Cargo.toml +++ b/cmds/fleet/Cargo.toml @@ -9,27 +9,21 @@ nixlike.workspace = true better-command.workspace = true tokio.workspace = true -anyhow = "1.0" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +clap.workspace = true +age = { workspace = true, features = ["armor"] } +anyhow.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +serde.workspace = true +serde_json.workspace = true +tempfile.workspace = true time = { version = "0.3", features = ["serde"] } -tempfile = "3.10" once_cell = "1.19" hostname = "0.3" age-core = "0.10" peg = "0.8" -age = { version = "0.10", features = ["ssh", "armor"] } base64 = "0.22.1" chrono = { version = "0.4", features = ["serde"] } -# Using fixed version for rust on stable nixos branches. -clap = { version = ">=4.4, <4.5", features = [ - "derive", - "env", - "wrap_help", - "unicode", -] } -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] } tokio-util = { version = "0.7", features = ["codec"] } async-trait = "0.1" futures = "0.3" @@ -40,9 +34,7 @@ "supports-color", "supports-colors", ] } -r2d2 = "0.8.10" abort-on-drop = "0.2" -unindent = "0.2" regex = "1.10" openssh = "0.10" crossterm = { version = "0.27.0", features = ["use-dev-tty"] } @@ -51,12 +43,13 @@ tracing-indicatif = { version = "0.3", optional = true } human-repr = { version = "1.1", optional = true } indicatif = { version = "0.17", optional = true } +nix-eval = { version = "0.1.0", path = "../../crates/nix-eval" } [features] # Not quite stable indicatif = [ - "tracing-indicatif", + "dep:tracing-indicatif", "dep:indicatif", - "human-repr", + "dep:human-repr", "better-command/indicatif", ] --- a/cmds/fleet/src/better_nix_eval.rs +++ b/cmds/fleet/src/better_nix_eval.rs @@ -12,892 +12,14 @@ use better_command::{ClonableHandler, Handler, NixHandler, NoopHandler}; use futures::StreamExt; use itertools::Itertools; -use r2d2::{Pool, PooledConnection}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tokio::io::AsyncWriteExt; use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command}; use tokio::select; use tokio::sync::{mpsc, oneshot, Mutex}; -use tokio_util::codec::{FramedRead, LinesCodec}; use tracing::{debug, error, warn, Level}; - -const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\""; - -pub struct NixSessionInner { - full_delimiter: String, - nix_handler: ClonableHandler, - out: OutputHandler, - stdin: ChildStdin, - string_wrapping: (String, String), - number_wrapping: (String, String), - - executing_command: Arc>, - - next_id: u32, - free_list: Vec, -} -const TRAIN_STRING: &str = "\"TRAIN_STRING\""; -const TRAIN_NUMBER: &str = "13141516"; - -#[must_use] -struct ErrorCollector<'i, H> { - collected: Vec, - inner: &'i mut H, -} -impl<'i, H> ErrorCollector<'i, H> { - fn new(inner: &'i mut H) -> Self { - Self { - collected: vec![], - inner, - } - } -} -impl ErrorCollector<'_, H> { - fn handle_line_inner(&mut self, msg: &str) -> bool { - let Some(msg) = msg.strip_prefix("@nix ") else { - return false; - }; - #[derive(Deserialize)] - struct ErrorAction { - action: String, - level: u32, - msg: String, - } - let Ok(act) = serde_json::from_str::(msg) else { - return false; - }; - if act.action != "msg" || act.level != 0 { - return false; - } - self.collected.push(act.msg); - true - } - fn finish(self) -> Result<()> { - // fn dedent(s: String) -> String { - // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.) - // } - if !self.collected.is_empty() { - bail!( - "{}", - self.collected - .iter() - .map(|v| { - if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") { - let v = unindent::unindent(f.trim_start()); - v.trim().to_owned() - } else { - v.to_owned() - } - }) - .join("\n") - ); - } - Ok(()) - } - fn flush(self) { - for line in self.collected { - warn!("{line}"); - } - } -} -impl Handler for ErrorCollector<'_, H> { - fn handle_line(&mut self, e: &str) { - if self.handle_line_inner(e) { - return; - } - self.inner.handle_line(e) - } -} - -enum OutputLine { - Out(String), - Err(String), -} -struct OutputHandler { - rx: mpsc::Receiver, - _cancel_handle: oneshot::Receiver<()>, -} -impl OutputHandler { - fn new(out: ChildStdout, err: ChildStderr) -> Self { - let mut out = FramedRead::new(out, LinesCodec::new()); - let mut err = FramedRead::new(err, LinesCodec::new()); - let (tx, rx) = mpsc::channel(20); - let (mut cancelled, _cancel_handle) = oneshot::channel(); - tokio::spawn(async move { - loop { - select! { - // We should receive errors earlier than synchronization - biased; - e = err.next() => { - let Some(Ok(e)) = e else { - if e.is_some() { - error!("bad repl stderr: {e:?}"); - } - continue; - }; - let _ = tx.send(OutputLine::Err(e)).await; - } - o = out.next() => { - let Some(Ok(o)) = o else { - if o.is_some() { - error!("bad repl stdout: {o:?}"); - } - continue; - }; - let _ = tx.send(OutputLine::Out(o)).await; - } - // Reader doesn't care about stdout, as this is cancelled. - // Error still might be useful, to process leftover span closures? - _ = cancelled.closed() => { - break; - } - } - } - }); - Self { rx, _cancel_handle } - } - async fn next(&mut self) -> Option { - self.rx.recv().await - } -} - -struct WarnHandler; -impl Handler for WarnHandler { - fn handle_line(&mut self, e: &str) { - warn!(target: "nix", "{e}") - } -} - -impl NixSessionInner { - async fn new(flake: &OsStr, extra_args: impl IntoIterator) -> Result { - let mut cmd = Command::new("nix"); - cmd.arg("repl") - .arg(flake) - .arg("--log-format") - .arg("internal-json"); - for arg in extra_args { - cmd.arg(arg); - } - cmd.stdin(Stdio::piped()); - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); - let cmd = cmd.spawn()?; - let stdout = cmd.stdout.unwrap(); - let stderr = cmd.stderr.unwrap(); - let mut out = OutputHandler::new(stdout, stderr); - let mut stdin = cmd.stdin.unwrap(); - // Standard repl hello doesn't work with internal-json logger - stdin.write_all(REPL_DELIMITER.as_bytes()).await?; - stdin.write_all(b"\n").await?; - stdin.flush().await?; - let nix_handler = NixHandler::default(); - let mut full_delimiter = None; - let mut errors = vec![]; - while let Some(line) = out.next().await { - let line = match line { - OutputLine::Out(o) => o, - OutputLine::Err(_e) => { - // Handle startup errors, but skip repl hello? - errors.push(_e); - continue; - } - }; - if line.contains(REPL_DELIMITER) { - debug!("discovered repl delimiter with added colors: {line}"); - full_delimiter = Some(line.to_owned()); - break; - } - } - let Some(full_delimiter) = full_delimiter else { - for e in errors { - error!("{e}"); - } - bail!("failed to discover delimiter"); - }; - let mut res = Self { - full_delimiter, - nix_handler: ClonableHandler::new(nix_handler), - out, - stdin, - string_wrapping: Default::default(), - number_wrapping: Default::default(), - - executing_command: Arc::new(Mutex::new(())), - - next_id: 0, - free_list: vec![], - }; - res.train().await?; - Ok(res) - } - async fn train(&mut self) -> Result<()> { - { - let full_string = self - .execute_expression_raw(TRAIN_STRING, &mut NoopHandler) - .await?; - let string_offset = full_string.find(TRAIN_STRING).expect("contained"); - let string_prefix = &full_string[..string_offset]; - let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..]; - self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned()); - } - { - let full_number = self - .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler) - .await?; - let number_offset = full_number.find(TRAIN_NUMBER).expect("contained"); - let number_prefix = &full_number[..number_offset]; - let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..]; - self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned()); - } - Ok(()) - } - async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> { - if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() { - let cmd_str = String::from_utf8_lossy(cmd.as_ref()); - tracing::debug!("{cmd_str}"); - }; - self.stdin.write_all(cmd.as_ref()).await?; - self.stdin.write_all(b"\n").await?; - Ok(()) - } - async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result { - let mut out = String::new(); - while let Some(line) = self.out.next().await { - let line = match line { - OutputLine::Out(out) => out, - OutputLine::Err(err) => { - err_handler.handle_line(&err); - continue; - } - }; - if line == self.full_delimiter { - return Ok(out); - } - if !out.is_empty() { - out.push('\n'); - } - out.push_str(&line); - } - bail!("didn't reached delimiter"); - } - async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result { - let num = self.number_wrapping.clone(); - let n = self.execute_expression_wrapping(expr, &num).await?; - Ok(n.parse::()?) - } - async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result { - let num = self.string_wrapping.clone(); - let n = self.execute_expression_wrapping(expr, &num).await?; - let str: String = serde_json::from_str(&n)?; - Ok(str) - } - async fn execute_expression_to_json( - &mut self, - expr: impl AsRef<[u8]>, - ) -> Result { - let mut fexpr = b"builtins.toJSON (".to_vec(); - fexpr.extend_from_slice(expr.as_ref()); - fexpr.push(b')'); - let v = self - .execute_expression_string(fexpr) - .await - .context("string expression")?; - serde_json::from_str(&v).context("json parse") - } - async fn execute_expression_wrapping( - &mut self, - expr: impl AsRef<[u8]>, - wrapping: &(String, String), - ) -> Result { - let mut nix_handler = self.nix_handler.clone(); - let mut collected = ErrorCollector::new(&mut nix_handler); - let res = self.execute_expression_raw(expr, &mut collected).await?; - if res.is_empty() { - collected.finish()?; - bail!("expected expression, got nothing") - } else { - collected.flush() - }; - let Some(res) = res.strip_prefix(&wrapping.0) else { - bail!("invalid type") - }; - let Some(res) = res.strip_suffix(&wrapping.1) else { - bail!("invalid type") - }; - Ok(res.to_owned()) - } - async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> { - let mut nix_handler = self.nix_handler.clone(); - let mut collected = ErrorCollector::new(&mut nix_handler); - let v = self.execute_expression_raw(expr, &mut collected).await?; - collected.finish()?; - ensure!(v.is_empty(), "unexpected expression result"); - Ok(()) - } - async fn execute_expression_raw( - &mut self, - expr: impl AsRef<[u8]>, - err_handler: &mut dyn Handler, - ) -> Result { - // Prevent two commands from being executed in parallel, messing with each other. - let _lock = self.executing_command.clone(); - let _guard = _lock.lock().await; - - self.send_command(expr).await?; - // It will be echoed - self.send_command(REPL_DELIMITER).await?; - self.read_until_delimiter(err_handler).await - } - async fn execute_assign(&mut self, expr: impl AsRef) -> Result { - let id = self.allocate_id(); - self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref())) - .await?; - Ok(id) - } - - /// Id should be immediately used - fn allocate_id(&mut self) -> u32 { - if let Some(free) = self.free_list.pop() { - free - } else { - let v = self.next_id; - self.next_id += 1; - v - } - } - // Nix has no way to deallocate variable, yet GC will correct everything not reachable. - // async fn free_id(&mut self, id: u32) -> Result<()> { - // self.execute_expression_empty(format!("sess_field_{id} = null")) - // .await?; - // self.free_list.push(id); - // Ok(()) - // } -} - -#[derive(Clone)] -pub struct NixSession(Arc>>); - -#[derive(Clone)] -pub struct NixExprBuilder { - out: String, - used_fields: Vec, -} -impl NixExprBuilder { - pub fn object() -> Self { - NixExprBuilder { - out: "{ ".to_owned(), - used_fields: Vec::new(), - } - } - pub fn string(s: &str) -> Self { - NixExprBuilder { - out: nixlike::serialize(s) - .expect("no problems with serializing_string") - .trim_end() - .to_owned(), - used_fields: Vec::new(), - } - } - pub fn serialized(v: impl Serialize) -> Self { - let serialized = nixlike::serialize(v).expect("invalid value for apply"); - Self { - out: serialized.trim_end().to_owned(), - used_fields: Vec::new(), - } - } - pub fn field(f: Field) -> Self { - Self { - out: format!("sess_field_{}", f.0.value.expect("no value")), - used_fields: vec![f], - } - } - pub fn end_obj(&mut self) { - self.out.push('}'); - } - pub fn obj_key(&mut self, name: Self, value: Self) { - self.out.push_str(r#""${"#); - self.extend(name); - self.out.push_str(r#"}" = "#); - self.extend(value); - self.out.push_str("; "); - } - - pub fn extend(&mut self, e: Self) { - self.out.push_str(&e.out); - self.used_fields.extend(e.used_fields); - } - - #[allow(dead_code)] - pub fn session(&self) -> NixSession { - let mut session = None; - for ele in &self.used_fields { - if session.is_none() { - session = Some(ele.0.session.clone()); - continue; - } - let session = &session.as_ref().expect("checked").0; - let ele_sess = &ele.0.session.0; - assert!( - Arc::ptr_eq(session, ele_sess), - "can't mix fields from different session" - ); - } - session.expect("expr without fields used") - } - #[allow(dead_code)] - pub fn index_attr(&mut self, s: &str) { - let escaped = nixlike::serialize(s).expect("string"); - self.out.push('.'); - self.out.push_str(escaped.trim_end()); - } -} - -#[macro_export] -macro_rules! nix_expr_inner { - //(@munch_object FIXME: value should be arbitrary nix_expr_inner input... Time to write proc-macro? - (@obj($o:ident) $field:ident, $($tt:tt)*) => {{ - $o.obj_key( - NixExprBuilder::string(stringify!($field)), - NixExprBuilder::field($field), - ); - nix_expr_inner!(@obj($o) $($tt)*); - }}; - (@obj($o:ident) $field:ident: $v:block, $($tt:tt)*) => {{ - $o.obj_key( - NixExprBuilder::string(stringify!($field)), - NixExprBuilder::serialized(&$v), - ); - nix_expr_inner!(@obj($o) $($tt)*); - }}; - (@obj($o:ident)) => {{}}; - (Obj { $($tt:tt)* }) => {{ - use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner}; - let mut out = NixExprBuilder::object(); - nix_expr_inner!(@obj(out) $($tt)*); - out.end_obj(); - out - }}; - (@field($o:ident) . $var:ident $($tt:tt)*) => {{ - $o.index_attr(stringify!($var)); - nix_expr_inner!(@field($o) $($tt)*); - }}; - (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{ - $o.push(Index::attr(&$v)); - nix_expr_inner!(@o($o) $($tt)*); - }}; - (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{ - $o.push(Index::Expr($crate::nix_expr_inner!($($var)+))); - nix_expr_inner!(@o($o) $($tt)*); - }}; - (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => { - $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+))); - nix_expr_inner!(@o($o) $($tt)*); - }; - (@field($o:ident)) => {}; - ($field:ident $($tt:tt)*) => {{ - use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner}; - #[allow(unused_mut, reason = "might be used if indexed")] - let mut out = NixExprBuilder::field($field.clone()); - nix_expr_inner!(@field(out) $($tt)*); - out - }}; - ($v:literal) => {{ - use $crate::better_nix_eval::NixExprBuilder; - NixExprBuilder::string($v) - }}; - ({$v:expr}) => {{ - use $crate::better_nix_eval::NixExprBuilder; - NixExprBuilder::serialized(&$v) - }} -} -#[macro_export] -macro_rules! nix_expr { - ($($tt:tt)+) => {{ - use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner}; - let expr = nix_expr_inner!($($tt)+); - Field::new(expr.session(), expr.out) - }}; -} -#[macro_export] -macro_rules! nix_go { - (@o($o:ident) . $var:ident $($tt:tt)*) => {{ - $o.push(Index::attr(stringify!($var))); - nix_go!(@o($o) $($tt)*); - }}; - (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{ - $o.push(Index::attr(&$v)); - nix_go!(@o($o) $($tt)*); - }}; - (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{ - $o.push(Index::Expr($crate::nix_expr_inner!($($var)+))); - nix_go!(@o($o) $($tt)*); - }}; - (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => { - $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+))); - nix_go!(@o($o) $($tt)*); - }; - (@o($o:ident) | $($var:tt)*) => { - $o.push(Index::Pipe($crate::nix_expr_inner!($($var)+))); - }; - (@o($o:ident)) => {}; - ($field:ident $($tt:tt)+) => {{ - use $crate::{nix_go, better_nix_eval::Index}; - let field = $field.clone(); - let mut out = vec![]; - nix_go!(@o(out) $($tt)*); - field.select(out).await? - }} -} -#[macro_export] -macro_rules! nix_go_json { - ($($tt:tt)*) => {{ - $crate::nix_go!($($tt)*).as_json().await? - }}; -} -#[derive(Clone)] -pub enum Index { - Var(String), - String(String), - #[allow(dead_code)] - Apply(String), - #[allow(dead_code)] - Expr(NixExprBuilder), - ExprApply(NixExprBuilder), - Pipe(NixExprBuilder), -} -impl Index { - pub fn var(v: impl AsRef) -> Self { - let v = v.as_ref(); - assert!( - !(v.contains('.') | v.contains(' ')), - "bad variable name: {v}" - ); - Self::Var(v.to_owned()) - } - pub fn attr(v: impl AsRef) -> Self { - Self::String(v.as_ref().to_owned()) - } - #[allow(dead_code)] - pub fn apply(v: impl Serialize) -> Self { - let serialized = nixlike::serialize(v).expect("invalid value for apply"); - Self::Apply(serialized.trim_end().to_owned()) - } -} -impl Display for Index { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Index::Var(v) => { - write!(f, "{v}") - } - Index::String(k) => { - let v = nixlike::format_identifier(k.as_str()); - write!(f, ".{v}") - } - Index::Apply(o) => { - write!(f, "({o})") - } - Index::Expr(e) => { - write!(f, "[{}]", e.out) - } - Index::ExprApply(e) => { - write!(f, "({})", e.out) - } - Index::Pipe(e) => { - write!(f, "({})", e.out) - } - } - } -} -impl fmt::Debug for Index { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{self}") - } -} -struct PathDisplay<'i>(&'i [Index]); -impl Display for PathDisplay<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for i in self.0 { - write!(f, "{i}")?; - } - Ok(()) - } -} -struct FieldInner { - full_path: Option>, - session: NixSession, - value: Option, -} -fn context(op: &str, full_path: Option<&[Index]>, query: &str) -> String { - if let Some(full_path) = &full_path { - format!("on {op}, full path: {}", PathDisplay(full_path)) - } else { - format!("query: {query:?}") - } -} -#[derive(Clone)] -pub struct Field(Arc); -impl Field { - fn root(session: NixSession) -> Self { - Self(Arc::new(FieldInner { - full_path: Some(vec![]), - session, - value: None, - })) - } - async fn new(session: NixSession, query: &str) -> Result { - let vid = session - .0 - .lock() - .await - .execute_assign(query) - .await - .with_context(|| context("new root", None, query))?; - Ok(Self(Arc::new(FieldInner { - full_path: None, - session, - value: Some(vid), - }))) - } - pub async fn field(session: NixSession, field: &str) -> Result { - Self::root(session).select([Index::var(field)]).await - } - pub async fn select<'a>(&self, name: impl IntoIterator) -> Result { - let mut used_fields = Vec::new(); - let mut name = name.into_iter(); - let mut full_path = self.0.full_path.clone(); - let mut query = if let Some(id) = self.0.value { - format!("sess_field_{id}") - } else { - let first = name.next(); - if let Some(Index::Var(i)) = first { - if let Some(full_path) = &mut full_path { - full_path.push(Index::Var(i.clone())); - } - i.clone() - } else { - panic!("first path item should be variable, got {first:?}") - } - }; - for v in name { - if let Some(full_path) = &mut full_path { - full_path.push(v.clone()); - } - match v { - Index::Var(_) => panic!("var item may only be first"), - Index::String(s) => { - let escaped = nixlike::serialize(s)?; - query.push('.'); - query.push_str(escaped.trim()); - } - Index::Apply(a) => { - // In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()` - query = format!("({query} {a})"); - } - Index::Expr(e) => { - let index = Field::new(self.0.session.clone(), &e.out).await?; - used_fields.push(index.clone()); - query.push('.'); - let index = format!("${{sess_field_{}}}", index.0.value.expect("value")); - query.push_str(&index); - } - Index::ExprApply(e) => { - let index = Field::new(self.0.session.clone(), &e.out).await?; - used_fields.push(index.clone()); - query.push(' '); - let index = format!("sess_field_{}", index.0.value.expect("value")); - query.push_str(&index); - query = format!("({query})"); - } - Index::Pipe(v) => { - let index = Field::new(self.0.session.clone(), &v.out).await?; - used_fields.push(index.clone()); - let index = format!("sess_field_{}", index.0.value.expect("value")); - query = format!("({index} {query})"); - } - } - } - let vid = self - .0 - .session - .0 - .lock() - .await - .execute_assign(&query) - .await - .with_context(|| { - if let Some(full_path) = &full_path { - format!("full path: {}", PathDisplay(full_path)) - } else { - format!("query: {query:?}") - } - })?; - Ok(Self(Arc::new(FieldInner { - full_path, - session: self.0.session.clone(), - value: Some(vid), - }))) - } - pub async fn as_json(&self) -> Result { - let id = self.0.value.expect("can't serialize root field"); - let query = format!("sess_field_{id}"); - self.0 - .session - .0 - .lock() - .await - .execute_expression_to_json(&query) - .await - .with_context(|| context("as_json", self.0.full_path.as_deref(), &query)) - } - #[allow(dead_code)] - pub async fn has_field(&self, name: &str) -> Result { - let id = self.0.value.expect("can't list root fields"); - let key = nixlike::escape_string(name); - let query = format!("sess_field_{id} ? {key}"); - self.0 - .session - .0 - .lock() - .await - .execute_expression_to_json(&query) - .await - .with_context(|| context("has_field", self.0.full_path.as_deref(), &query)) - } - pub async fn list_fields(&self) -> Result> { - let id = self.0.value.expect("can't list root fields"); - let query = format!("builtins.attrNames sess_field_{id}"); - self.0 - .session - .0 - .lock() - .await - .execute_expression_to_json(&query) - .await - .with_context(|| context("list field", self.0.full_path.as_deref(), &query)) - } - pub async fn type_of(&self) -> Result { - let id = self.0.value.expect("can't list root fields"); - let query = format!("builtins.typeOf sess_field_{id}"); - self.0 - .session - .0 - .lock() - .await - .execute_expression_to_json(&query) - .await - .with_context(|| context("type_of", self.0.full_path.as_deref(), &query)) - } - #[allow(dead_code)] - pub async fn import(&self) -> Result { - let import = Self::new(self.0.session.clone(), "import").await?; - Ok(nix_go!(self | import)) - } - pub async fn build(&self) -> Result> { - let id = self.0.value.expect("can't use build on not-value"); - let query = format!(":b sess_field_{id}"); - let vid = self - .0 - .session - .0 - .lock() - .await - .execute_expression_raw(&query, &mut NixHandler::default()) - .await?; - ensure!( - !vid.is_empty(), - "build failed: {}", - context("build", self.0.full_path.as_deref(), &query), - ); - let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n") - else { - panic!("unexpected build output: {vid:?}"); - }; - let outputs = vid - .split('\n') - .filter(|v| !v.is_empty()) - .map(|v| v.split_once(" -> ").expect("unexpected build output")) - .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b))) - .collect(); - Ok(outputs) - } -} -impl Drop for FieldInner { - fn drop(&mut self) { - if let Some(id) = self.value { - if let Ok(mut lock) = self.session.0.try_lock() { - lock.free_list.push(id) - } - // Leaked - } - } -} -struct NixSessionPoolInner { - flake: OsString, - nix_args: Vec, -} - -#[derive(Debug)] -pub struct NixPoolError(anyhow::Error); -impl From for NixPoolError { - fn from(value: anyhow::Error) -> Self { - Self(value) - } -} -impl std::error::Error for NixPoolError {} -impl std::fmt::Display for NixPoolError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} -impl r2d2::ManageConnection for NixSessionPoolInner { - type Connection = NixSessionInner; - type Error = NixPoolError; - fn connect(&self) -> std::result::Result { - let _v = TOKIO_RUNTIME - .get() - .expect("missed tokio runtime init!") - .enter(); - Ok(futures::executor::block_on(NixSessionInner::new( - self.flake.as_os_str(), - self.nix_args.iter().map(OsString::as_os_str), - ))?) - } - - fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> { - let _v = TOKIO_RUNTIME - .get() - .expect("missed tokio runtime init!") - .enter(); - let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?; - if res != 4 { - return Err(anyhow!("sanity check failed").into()); - }; - Ok(()) - } - - fn has_broken(&self, _conn: &mut Self::Connection) -> bool { - false - } -} -pub struct NixSessionPool(Pool); -impl NixSessionPool { - pub async fn new(flake: OsString, nix_args: Vec) -> Result { - let inner = tokio::task::block_in_place(|| { - r2d2::Builder::::new() - .min_idle(Some(0)) - .build(NixSessionPoolInner { flake, nix_args }) - })?; - Ok(Self(inner)) - } - pub async fn get(&self) -> Result { - let v = tokio::task::block_in_place(|| self.0.get())?; - Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v)))) - } -} - -pub static TOKIO_RUNTIME: OnceLock = OnceLock::new(); --- a/cmds/fleet/src/cmds/build_systems.rs +++ b/cmds/fleet/src/cmds/build_systems.rs @@ -4,10 +4,10 @@ use crate::command::MyCommand; use crate::host::{Config, ConfigHost}; -use crate::nix_go; use anyhow::{anyhow, Result}; use clap::{Parser, ValueEnum}; use itertools::Itertools as _; +use nix_eval::nix_go; use tokio::{task::LocalSet, time::sleep}; use tracing::{error, field, info, info_span, warn, Instrument}; --- a/cmds/fleet/src/cmds/info.rs +++ b/cmds/fleet/src/cmds/info.rs @@ -1,9 +1,9 @@ use std::collections::BTreeSet; use crate::host::Config; -use crate::nix_go_json; use anyhow::{ensure, Result}; use clap::Parser; +use nix_eval::nix_go_json; #[derive(Parser)] pub struct Info { --- a/cmds/fleet/src/cmds/secrets/mod.rs +++ b/cmds/fleet/src/cmds/secrets/mod.rs @@ -11,6 +11,7 @@ use crossterm::{terminal, tty::IsTty}; use fleet_shared::SecretData; use itertools::Itertools; +use nix_eval::{nix_go, nix_go_json, Value}; use owo_colors::OwoColorize; use serde::Deserialize; use tabled::{Table, Tabled}; @@ -18,10 +19,8 @@ use tracing::{error, info, info_span, warn, Instrument}; use crate::{ - better_nix_eval::Field, fleetdata::{encrypt_secret_data, FleetSecret, FleetSecretPart, FleetSharedSecret}, host::Config, - nix_go, nix_go_json, }; #[derive(Parser)] @@ -130,7 +129,7 @@ secret_name: &str, config: &Config, mut secret: FleetSharedSecret, - field: Field, + field: Value, updated_set: &[String], prefer_identities: &[String], ) -> Result { @@ -197,8 +196,8 @@ async fn generate_pure( _config: &Config, _display_name: &str, - _secret: Field, - _default_generator: Field, + _secret: Value, + _default_generator: Value, _owners: &[String], ) -> Result { bail!("pure generators are broken for now") @@ -206,8 +205,8 @@ async fn generate_impure( config: &Config, _display_name: &str, - secret: Field, - default_generator: Field, + secret: Value, + default_generator: Value, owners: &[String], ) -> Result { let generator = nix_go!(secret.generator); @@ -289,7 +288,7 @@ async fn generate( config: &Config, display_name: &str, - secret: Field, + secret: Value, owners: &[String], ) -> Result { let generator = nix_go!(secret.generator); @@ -332,7 +331,7 @@ async fn generate_shared( config: &Config, display_name: &str, - secret: Field, + secret: Value, expected_owners: Vec, ) -> Result { // let owners: Vec = nix_go_json!(secret.expectedOwners); --- a/cmds/fleet/src/host.rs +++ b/cmds/fleet/src/host.rs @@ -12,15 +12,14 @@ use anyhow::{anyhow, bail, ensure, Context, Result}; use clap::{ArgGroup, Parser}; use fleet_shared::SecretData; +use nix_eval::{nix_go, nix_go_json, NixSessionPool, Value}; use openssh::SessionBuilder; use serde::de::DeserializeOwned; use tempfile::NamedTempFile; use crate::{ - better_nix_eval::{Field, NixSessionPool}, command::MyCommand, fleetdata::{FleetData, FleetSecret, FleetSharedSecret}, - nix_go, nix_go_json, }; pub struct FleetConfigInternals { @@ -30,12 +29,12 @@ pub data: Mutex, pub nix_args: Vec, /// fleet_config.config - pub config_field: Field, + pub config_field: Value, /// fleet_config.unchecked.config - pub config_unchecked_field: Field, + pub config_unchecked_field: Value, /// import nixpkgs {system = local}; - pub default_pkgs: Field, + pub default_pkgs: Value, } #[derive(Clone)] @@ -55,7 +54,7 @@ pub local: bool, pub session: OnceLock>, - pub nixos_config: Option, + pub nixos_config: Option, } impl ConfigHost { async fn open_session(&self) -> Result> { @@ -201,7 +200,7 @@ } Ok(out) } - pub async fn secret_field(&self, name: &str) -> Result { + pub async fn secret_field(&self, name: &str) -> Result { let Some(nixos) = &self.nixos_config else { bail!("host is virtual and has no secrets"); }; @@ -209,7 +208,7 @@ } /// Packages for this host, resolved with nixpkgs overlays - pub async fn pkgs(&self) -> Result { + pub async fn pkgs(&self) -> Result { let Some(nixos) = &self.nixos_config else { return Ok(self.config.default_pkgs.clone()); }; @@ -261,7 +260,7 @@ } Ok(out) } - pub async fn system_config(&self, host: &str) -> Result { + pub async fn system_config(&self, host: &str) -> Result { let fleet_field = &self.config_unchecked_field; Ok(nix_go!(fleet_field.hosts[{ host }].nixosSystem.config)) } @@ -275,7 +274,7 @@ /// Shared secrets configured in fleet.nix or in flake pub async fn list_configured_shared(&self) -> Result> { let config_field = &self.config_unchecked_field; - nix_go!(config_field.sharedSecrets).list_fields().await + Ok(nix_go!(config_field.sharedSecrets).list_fields().await?) } /// Shared secrets configured in fleet.nix pub fn list_shared(&self) -> Vec { @@ -389,13 +388,13 @@ let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?; let root_field = pool.get().await?; - let builtins_field = Field::field(root_field.clone(), "builtins").await?; + let builtins_field = Value::binding(root_field.clone(), "builtins").await?; if self.local_system == "detect" { self.local_system = nix_go_json!(builtins_field.currentSystem); } let local_system = self.local_system.clone(); - let fleet_root = Field::field(root_field, "fleetConfigurations").await?; + let fleet_root = Value::binding(root_field, "fleetConfigurations").await?; let fleet_field = nix_go!(fleet_root.default); let config_field = nix_go!(fleet_field.config); --- a/cmds/fleet/src/main.rs +++ b/cmds/fleet/src/main.rs @@ -1,5 +1,5 @@ #![recursion_limit = "512"] -#![feature(try_blocks, lint_reasons)] +#![feature(try_blocks)] pub(crate) mod cmds; pub(crate) mod command; @@ -173,6 +173,8 @@ setup_logging(); if let Err(e) = main_real().await { // If I remove this line, the next error!() line gets eaten. + // This is a bug in indicatif, it needs to be fixed + #[cfg(feature = "indicatif")] info!("fixme: this line gets eaten by tracing-indicatif on levels info+"); error!("{e:#}"); return ExitCode::FAILURE; @@ -181,7 +183,7 @@ } async fn main_real() -> Result<()> { - let _ = better_nix_eval::TOKIO_RUNTIME.set(tokio::runtime::Handle::current()); + nix_eval::init_tokio(); let nix_args = std::env::var_os("NIX_ARGS") .map(|a| extra_args::parse_os(&a)) --- /dev/null +++ b/cmds/generator-helper/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "fleet-generator-helper" +edition = "2021" +version.workspace = true + +[dependencies] +age.workspace = true +anyhow.workspace = true +clap.workspace = true +ed25519-dalek = { version = "2.1.1", features = ["rand_core"] } +fleet-shared.workspace = true +rand = "0.8.5" --- /dev/null +++ b/cmds/generator-helper/src/main.rs @@ -0,0 +1,204 @@ +use std::{ + fs, + io::{self, stdout, Cursor, Read, Write}, + path::PathBuf, + str::FromStr, +}; + +use age::Recipient; +use anyhow::{anyhow, bail, ensure, Context, Result}; +use clap::Parser; +use ed25519_dalek::SigningKey; +use fleet_shared::SecretData; +use rand::{ + distributions::{Alphanumeric, DistString, Distribution, Uniform}, + rngs::OsRng, + thread_rng, Rng, +}; + +fn write_output(out: &str, data: impl AsRef<[u8]>, stdout_marker: &mut bool) -> Result<()> { + let data = data.as_ref(); + if out == "-" { + let mut stdout = stdout(); + if *stdout_marker { + stdout.write_all(&[b'\n'])?; + } + *stdout_marker = true; + stdout.write_all(data)?; + } else { + fs::write(out, data)?; + }; + Ok(()) +} + +#[derive(Parser)] +enum Generate { + /// Generate public, private keys without wrapping, in standard ed25519 schema + /// (64 bytes private (due to merge with private), 32 bytes public) + Ed25519 { + public: String, + private: String, + /// Private key should be just the private key (32 bytes), not standard private+public. + #[arg(long)] + no_embed_public: bool, + }, + Password { + output: String, + size: usize, + #[arg(long, short = 'n')] + no_symbols: bool, + }, +} + +#[derive(Parser)] +enum Opts { + /// Encode public part from stdin. + Public { + #[arg(long)] + allow_empty: bool, + }, + /// Encrypt private part from stdin. + Private { + #[arg(long)] + allow_empty: bool, + #[arg(short = 'r')] + recipient: Vec, + }, + /// Generate keys in well-known schemas. + /// + /// Note that this command is only intended to be used in fleet secret generator, + /// otherwise you should ensure noone is able to read generated files, they don't have any mode set by default. + #[command(subcommand)] + Generate(Generate), + // Generate { + // kind: GenerateKind, + // /// Different generators generate different number of files, you need to specify number of outputs corresponding to the generator. + // #[arg(short = 'o')] + // outputs: Vec, + // }, +} + +fn parse_stdin() -> Result>> { + let mut input = vec![]; + io::stdin().read_to_end(&mut input)?; + if input.is_empty() { + Ok(None) + } else { + Ok(Some(input)) + } +} +pub fn encrypt_secret_data( + recipients: impl IntoIterator, + data: Vec, +) -> Option { + let mut encrypted = vec![]; + let recipients = recipients + .into_iter() + .map(|v| Box::new(v) as Box) + .collect::>(); + let mut encryptor = age::Encryptor::with_recipients(recipients)? + .wrap_output(&mut encrypted) + .expect("in memory write"); + io::copy(&mut Cursor::new(data), &mut encryptor).expect("in memory copy"); + encryptor.finish().expect("in memory flush"); + Some(SecretData { + data: encrypted, + encrypted: true, + }) +} + +fn main() -> Result<()> { + let opts = Opts::parse(); + // Assumed to be secure, seeded from secure OsRng+reseeded. + let mut rng = thread_rng(); + + match opts { + Opts::Public { allow_empty } => { + let stdin = parse_stdin()?; + if stdin.is_none() && !allow_empty { + bail!("empty stdin input is not allowed unless --allow-empty is set"); + } + let stdin = stdin.unwrap_or_default(); + io::stdout().write_all( + SecretData { + data: stdin, + encrypted: false, + } + .to_string() + .as_bytes(), + )?; + } + Opts::Private { + allow_empty, + recipient, + } => { + let stdin = parse_stdin()?; + if stdin.is_none() && !allow_empty { + bail!("empty stdin input is not allowed unless --allow-empty is set"); + } + let stdin = stdin.unwrap_or_default(); + if recipient.is_empty() { + bail!("recipient list is empty"); + } + let out = encrypt_secret_data( + recipient + .into_iter() + .map(|r| age::ssh::Recipient::from_str(&r)) + .collect::, age::ssh::ParseRecipientKeyError>>() + .map_err(|e| anyhow!("parse recipients: {e:?}"))?, + stdin, + ) + .expect("got recipients"); + io::stdout().write_all(out.to_string().as_bytes())?; + } + Opts::Generate(gen) => { + let mut stdout_marker: bool = false; + match gen { + Generate::Ed25519 { + public, + private, + no_embed_public, + } => { + let key = SigningKey::generate(&mut rng).to_keypair_bytes(); + + write_output(&public, &key[32..], &mut stdout_marker).context("public")?; + write_output( + &private, + &key[..{ + if no_embed_public { + 32 + } else { + 64 + } + }], + &mut stdout_marker, + ) + .context("private")?; + } + Generate::Password { + size, + no_symbols, + output, + } => { + ensure!( + size >= 6, + "misconfiguration? password is shorter than 6 chars" + ); + let out = if no_symbols { + Alphanumeric.sample_string(&mut rng, size) + } else { + // Alphabet of Alphanumberic + symbols + const GEN_ASCII_SYMBOLS: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~"; + let uniform = Uniform::new(0, GEN_ASCII_SYMBOLS.len()); + (0..size) + .map(|_| uniform.sample(&mut rng)) + .map(|i| GEN_ASCII_SYMBOLS[i] as char) + .collect::() + }; + write_output(&output, out, &mut stdout_marker)?; + } + } + } + } + Ok(()) +} --- a/cmds/install-secrets/Cargo.toml +++ b/cmds/install-secrets/Cargo.toml @@ -6,18 +6,13 @@ [dependencies] -age = { version = "0.10.0", features = ["ssh"] } -anyhow = "1.0.79" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -tracing = "0.1" -nix = {version = "0.27.1", features = ["user", "fs"]} -serde = { version = "1.0.196", features = ["derive"] } -serde_json = "1.0.113" -clap = { version = ">=4.4, <4.5", features = [ - "derive", - "env", - "wrap_help", - "unicode", -] } -tempfile = "3.10.0" +clap.workspace = true fleet-shared.workspace = true +age.workspace = true +anyhow.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +serde.workspace = true +serde_json.workspace = true +tempfile.workspace = true +nix.workspace = true --- a/cmds/remowt-agent/Cargo.toml +++ b/cmds/remowt-agent/Cargo.toml @@ -6,3 +6,5 @@ # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +iroh-net = "0.17.0" +tracing.workspace = true --- /dev/null +++ b/crates/nix-eval/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "nix-eval" +edition = "2021" +version.workspace = true + +[dependencies] +better-command.workspace = true +futures = "0.3.30" +itertools = "0.13.0" +nixlike.workspace = true +r2d2 = "0.8.10" +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +thiserror = "1.0.61" +tokio = { workspace = true, features = ["process", "io-util"] } +tokio-util = { version = "0.7.11", features = ["codec"] } +tracing.workspace = true +unindent = "0.2.3" --- /dev/null +++ b/crates/nix-eval/src/lib.rs @@ -0,0 +1,32 @@ +//! This whole library should be replaced with either binding to nix libexpr, +//! or with tvix (once it is able to build NixOS). +//! +//! Current api is awful, little effort was put into this implementation. + +use std::sync::Arc; + +pub use pool::NixSessionPool; +use pool::NixSessionPoolInner; +use r2d2::PooledConnection; +pub use session::{Error, Result}; +pub use value::{Index, Value}; + +mod pool; +mod session; +mod value; +// Contains macros helpers +#[doc(hidden)] +pub mod macros; + +#[derive(Clone)] +pub struct NixSession(pub(crate) Arc>>); + +impl NixSession { + fn ptr_eq(a: &Self, b: &Self) -> bool { + Arc::ptr_eq(&a.0, &b.0) + } +} + +pub fn init_tokio() { + let _ = pool::TOKIO_RUNTIME.set(tokio::runtime::Handle::current()); +} --- /dev/null +++ b/crates/nix-eval/src/macros.rs @@ -0,0 +1,183 @@ +use serde::Serialize; + +use crate::{NixSession, Value}; + +#[derive(Clone)] +pub struct NixExprBuilder { + pub(crate) out: String, + used_fields: Vec, +} +impl NixExprBuilder { + pub fn object() -> Self { + NixExprBuilder { + out: "{ ".to_owned(), + used_fields: Vec::new(), + } + } + pub fn string(s: &str) -> Self { + NixExprBuilder { + out: nixlike::serialize(s) + .expect("no problems with serializing_string") + .trim_end() + .to_owned(), + used_fields: Vec::new(), + } + } + pub fn serialized(v: impl Serialize) -> Self { + let serialized = nixlike::serialize(v).expect("invalid value for apply"); + Self { + out: serialized.trim_end().to_owned(), + used_fields: Vec::new(), + } + } + pub fn value(f: Value) -> Self { + Self { + out: format!("sess_field_{}", f.session_field_id()), + used_fields: vec![f], + } + } + pub fn end_obj(&mut self) { + self.out.push('}'); + } + pub fn obj_key(&mut self, name: Self, value: Self) { + self.out.push_str(r#""${"#); + self.extend(name); + self.out.push_str(r#"}" = "#); + self.extend(value); + self.out.push_str("; "); + } + + pub fn extend(&mut self, e: Self) { + self.out.push_str(&e.out); + self.used_fields.extend(e.used_fields); + } + + #[allow(dead_code)] + pub fn session(&self) -> NixSession { + let mut session = None; + for ele in &self.used_fields { + if session.is_none() { + session = Some(ele.session()); + continue; + } + let session = session.as_ref().expect("checked"); + let ele_sess = ele.session(); + assert!( + NixSession::ptr_eq(session, &ele_sess), + "can't mix fields from different session" + ); + } + session.expect("expr without fields used") + } + #[allow(dead_code)] + pub fn index_attr(&mut self, s: &str) { + let escaped = nixlike::serialize(s).expect("string"); + self.out.push('.'); + self.out.push_str(escaped.trim_end()); + } +} + +#[macro_export] +macro_rules! nix_expr_inner { + //(@munch_object FIXME: value should be arbitrary nix_expr_inner input... Time to write proc-macro? + (@obj($o:ident) $field:ident, $($tt:tt)*) => {{ + $o.obj_key( + NixExprBuilder::string(stringify!($field)), + NixExprBuilder::value($field), + ); + nix_expr_inner!(@obj($o) $($tt)*); + }}; + (@obj($o:ident) $field:ident: $v:block, $($tt:tt)*) => {{ + $o.obj_key( + NixExprBuilder::string(stringify!($field)), + NixExprBuilder::serialized(&$v), + ); + nix_expr_inner!(@obj($o) $($tt)*); + }}; + (@obj($o:ident)) => {{}}; + (Obj { $($tt:tt)* }) => {{ + use $crate::{macros::NixExprBuilder, nix_expr_inner}; + let mut out = NixExprBuilder::object(); + nix_expr_inner!(@obj(out) $($tt)*); + out.end_obj(); + out + }}; + (@field($o:ident) . $var:ident $($tt:tt)*) => {{ + $o.index_attr(stringify!($var)); + nix_expr_inner!(@field($o) $($tt)*); + }}; + (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{ + $o.push(Index::attr(&$v)); + nix_expr_inner!(@o($o) $($tt)*); + }}; + (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{ + $o.push(Index::Expr($crate::nix_expr_inner!($($var)+))); + nix_expr_inner!(@o($o) $($tt)*); + }}; + (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => { + $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+))); + nix_expr_inner!(@o($o) $($tt)*); + }; + (@field($o:ident)) => {}; + ($field:ident $($tt:tt)*) => {{ + use $crate::{macros::NixExprBuilder, nix_expr_inner}; + // might be used if indexed + #[allow(unused_mut)] + let mut out = NixExprBuilder::value($field.clone()); + nix_expr_inner!(@field(out) $($tt)*); + out + }}; + ($v:literal) => {{ + use $crate::macros::NixExprBuilder; + NixExprBuilder::string($v) + }}; + ({$v:expr}) => {{ + use $crate::macros::NixExprBuilder; + NixExprBuilder::serialized(&$v) + }} +} +#[macro_export] +macro_rules! nix_expr { + ($($tt:tt)+) => {{ + use $crate::{macros::{NixExprBuilder}, Value, nix_expr_inner}; + let expr = nix_expr_inner!($($tt)+); + Field::new(expr.session(), expr.out) + }}; +} + +#[macro_export] +macro_rules! nix_go { + (@o($o:ident) . $var:ident $($tt:tt)*) => {{ + $o.push(Index::attr(stringify!($var))); + nix_go!(@o($o) $($tt)*); + }}; + (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{ + $o.push(Index::attr(&$v)); + nix_go!(@o($o) $($tt)*); + }}; + (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{ + $o.push(Index::Expr($crate::nix_expr_inner!($($var)+))); + nix_go!(@o($o) $($tt)*); + }}; + (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => { + $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+))); + nix_go!(@o($o) $($tt)*); + }; + (@o($o:ident) | $($var:tt)*) => { + $o.push(Index::Pipe($crate::nix_expr_inner!($($var)+))); + }; + (@o($o:ident)) => {}; + ($field:ident $($tt:tt)+) => {{ + use $crate::{nix_go, Index}; + let field = $field.clone(); + let mut out = vec![]; + nix_go!(@o(out) $($tt)*); + field.select(out).await? + }} +} +#[macro_export] +macro_rules! nix_go_json { + ($($tt:tt)*) => {{ + $crate::nix_go!($($tt)*).as_json().await? + }}; +} --- /dev/null +++ b/crates/nix-eval/src/pool.rs @@ -0,0 +1,61 @@ +use std::ffi::OsString; +use std::sync::{Arc, OnceLock}; + +use r2d2::Pool; + +use crate::session::NixSessionInner; +use crate::{Error, NixSession, Result}; + +pub struct NixSessionPool(Pool); +impl NixSessionPool { + pub async fn new(flake: OsString, nix_args: Vec) -> Result { + let inner = tokio::task::block_in_place(|| { + r2d2::Builder::::new() + .min_idle(Some(0)) + .build(NixSessionPoolInner { flake, nix_args }) + })?; + Ok(Self(inner)) + } + pub async fn get(&self) -> Result { + let v = tokio::task::block_in_place(|| self.0.get())?; + Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v)))) + } +} + +pub(crate) struct NixSessionPoolInner { + flake: OsString, + nix_args: Vec, +} + +impl r2d2::ManageConnection for NixSessionPoolInner { + type Connection = NixSessionInner; + type Error = Error; + fn connect(&self) -> std::result::Result { + let _v = TOKIO_RUNTIME + .get() + .expect("missed tokio runtime init!") + .enter(); + Ok(futures::executor::block_on(NixSessionInner::new( + self.flake.as_os_str(), + self.nix_args.iter().map(OsString::as_os_str), + ))?) + } + + fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> { + let _v = TOKIO_RUNTIME + .get() + .expect("missed tokio runtime init!") + .enter(); + let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?; + if res != 4 { + // just in case, should fail much earlier + return Err(Error::SessionInit("misbehaving session")); + }; + Ok(()) + } + + fn has_broken(&self, _conn: &mut Self::Connection) -> bool { + false + } +} +pub static TOKIO_RUNTIME: OnceLock = OnceLock::new(); --- /dev/null +++ b/crates/nix-eval/src/session.rs @@ -0,0 +1,415 @@ +use std::{ffi::OsStr, num::ParseIntError, process::Stdio, sync::Arc}; + +use better_command::{ClonableHandler, Handler, NixHandler, NoopHandler}; +use futures::StreamExt; +use itertools::Itertools as _; +use serde::{de::DeserializeOwned, Deserialize}; +use thiserror::Error; +use tokio::{ + io::AsyncWriteExt, + process::{ChildStderr, ChildStdin, ChildStdout, Command}, + select, + sync::{mpsc, oneshot, Mutex}, +}; +use tokio_util::codec::{FramedRead, LinesCodec}; +use tracing::{debug, error, warn, Level}; + +#[derive(Error, Debug)] +pub enum Error { + #[error("failed to create nix repl session: {0}")] + SessionInit(&'static str), + #[error("unexpected end of output, nix crashed?")] + MissingDelimiter, + + #[error("expression did'nt produce any output")] + ExpectedOutput, + #[error("expression produced output, which is unexpected")] + UnexpectedOutput, + + #[error("unexpected expression output type")] + InvalidType, + + #[error("failed to build attr {attribute}:\n{error}")] + BuildFailed { attribute: String, error: String }, + + #[error("output: {0}")] + Json(#[from] serde_json::Error), + // int outputs are too specific, and should not be used, + // thus error is ok to be not informative. + #[error("int output: {0}")] + Int(ParseIntError), + #[error("pool: {0}")] + Pool(#[from] r2d2::Error), + #[error("io: {0}")] + Io(#[from] std::io::Error), + + // TODO: Should be done by wrapper/in different type. + #[error("at {0}: {1}")] + InContext(String, Box), + + #[error("error: {0}")] + NixError(String), +} +impl Error { + pub(crate) fn context(self, context: String) -> Self { + Self::InContext(context, Box::new(self)) + } +} +pub type Result = std::result::Result; + +enum OutputLine { + Out(String), + Err(String), +} +struct OutputHandler { + rx: mpsc::Receiver, + _cancel_handle: oneshot::Receiver<()>, +} +impl OutputHandler { + fn new(out: ChildStdout, err: ChildStderr) -> Self { + let mut out = FramedRead::new(out, LinesCodec::new()); + let mut err = FramedRead::new(err, LinesCodec::new()); + let (tx, rx) = mpsc::channel(20); + let (mut cancelled, _cancel_handle) = oneshot::channel(); + tokio::spawn(async move { + loop { + select! { + // We should receive errors earlier than synchronization + biased; + e = err.next() => { + let Some(Ok(e)) = e else { + if e.is_some() { + error!("bad repl stderr: {e:?}"); + } + continue; + }; + let _ = tx.send(OutputLine::Err(e)).await; + } + o = out.next() => { + let Some(Ok(o)) = o else { + if o.is_some() { + error!("bad repl stdout: {o:?}"); + } + continue; + }; + let _ = tx.send(OutputLine::Out(o)).await; + } + // Reader doesn't care about stdout, as this is cancelled. + // Error still might be useful, to process leftover span closures? + _ = cancelled.closed() => { + break; + } + } + } + }); + Self { rx, _cancel_handle } + } + async fn next(&mut self) -> Option { + self.rx.recv().await + } +} + +#[must_use] +struct ErrorCollector<'i, H> { + collected: Vec, + inner: &'i mut H, +} +impl<'i, H> ErrorCollector<'i, H> { + fn new(inner: &'i mut H) -> Self { + Self { + collected: vec![], + inner, + } + } +} +impl ErrorCollector<'_, H> { + fn handle_line_inner(&mut self, msg: &str) -> bool { + let Some(msg) = msg.strip_prefix("@nix ") else { + return false; + }; + #[derive(Deserialize)] + struct ErrorAction { + action: String, + level: u32, + msg: String, + } + let Ok(act) = serde_json::from_str::(msg) else { + return false; + }; + if act.action != "msg" || act.level != 0 { + return false; + } + self.collected.push(act.msg); + true + } + fn finish(self) -> Result<()> { + // fn dedent(s: String) -> String { + // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.) + // } + if !self.collected.is_empty() { + return Err(Error::NixError(format!( + "{}", + self.collected + .iter() + .map(|v| { + if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") { + let v = unindent::unindent(f.trim_start()); + v.trim().to_owned() + } else { + v.to_owned() + } + }) + .join("\n"), + ))); + } + Ok(()) + } + fn flush(self) { + for line in self.collected { + warn!("{line}"); + } + } +} +impl Handler for ErrorCollector<'_, H> { + fn handle_line(&mut self, e: &str) { + if self.handle_line_inner(e) { + return; + } + self.inner.handle_line(e) + } +} + +pub struct NixSessionInner { + full_delimiter: String, + nix_handler: ClonableHandler, + out: OutputHandler, + stdin: ChildStdin, + string_wrapping: (String, String), + number_wrapping: (String, String), + + executing_command: Arc>, + + next_id: u32, + pub(crate) free_list: Vec, +} + +/// Discover inter-message repl delimiter +const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\""; +/// Discover formatting around strings +const TRAIN_STRING: &str = "\"TRAIN_STRING\""; +/// Discover formatting around numbers +const TRAIN_NUMBER: &str = "13141516"; +// Other types of formatting are not discovered, because they are not used, JSON serialization is used instead +// Techically, number training is also not required, because numbers can be converted to string too... +// Eh, I'll remove it later. + +impl NixSessionInner { + pub(crate) async fn new( + flake: &OsStr, + extra_args: impl IntoIterator, + ) -> Result { + let mut cmd = Command::new("nix"); + cmd.arg("repl") + .arg(flake) + .arg("--log-format") + .arg("internal-json"); + for arg in extra_args { + cmd.arg(arg); + } + cmd.stdin(Stdio::piped()); + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + let cmd = cmd.spawn()?; + let stdout = cmd.stdout.unwrap(); + let stderr = cmd.stderr.unwrap(); + let mut out = OutputHandler::new(stdout, stderr); + let mut stdin = cmd.stdin.unwrap(); + // Standard repl hello doesn't work with internal-json logger + stdin.write_all(REPL_DELIMITER.as_bytes()).await?; + stdin.write_all(b"\n").await?; + stdin.flush().await?; + let nix_handler = NixHandler::default(); + let mut full_delimiter = None; + let mut errors = vec![]; + while let Some(line) = out.next().await { + let line = match line { + OutputLine::Out(o) => o, + OutputLine::Err(_e) => { + // Handle startup errors, but skip repl hello? + errors.push(_e); + continue; + } + }; + if line.contains(REPL_DELIMITER) { + debug!("discovered repl delimiter with added colors: {line}"); + full_delimiter = Some(line.to_owned()); + break; + } + } + let Some(full_delimiter) = full_delimiter else { + for e in errors { + error!("{e}"); + } + return Err(Error::SessionInit("failed to discover delimiter")); + }; + let mut res = Self { + full_delimiter, + nix_handler: ClonableHandler::new(nix_handler), + out, + stdin, + string_wrapping: Default::default(), + number_wrapping: Default::default(), + + executing_command: Arc::new(Mutex::new(())), + + next_id: 0, + free_list: vec![], + }; + res.train().await?; + Ok(res) + } + async fn train(&mut self) -> Result<()> { + { + let full_string = self + .execute_expression_raw(TRAIN_STRING, &mut NoopHandler) + .await?; + let string_offset = full_string.find(TRAIN_STRING).expect("contained"); + let string_prefix = &full_string[..string_offset]; + let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..]; + self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned()); + } + { + let full_number = self + .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler) + .await?; + let number_offset = full_number.find(TRAIN_NUMBER).expect("contained"); + let number_prefix = &full_number[..number_offset]; + let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..]; + self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned()); + } + Ok(()) + } + async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> { + if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() { + let cmd_str = String::from_utf8_lossy(cmd.as_ref()); + tracing::debug!("{cmd_str}"); + }; + self.stdin.write_all(cmd.as_ref()).await?; + self.stdin.write_all(b"\n").await?; + Ok(()) + } + async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result { + let mut out = String::new(); + while let Some(line) = self.out.next().await { + let line = match line { + OutputLine::Out(out) => out, + OutputLine::Err(err) => { + err_handler.handle_line(&err); + continue; + } + }; + if line == self.full_delimiter { + return Ok(out); + } + if !out.is_empty() { + out.push('\n'); + } + out.push_str(&line); + } + return Err(Error::MissingDelimiter); + } + pub(crate) async fn execute_expression_number( + &mut self, + expr: impl AsRef<[u8]>, + ) -> Result { + let num = self.number_wrapping.clone(); + let n = self.execute_expression_wrapping(expr, &num).await?; + n.parse::().map_err(Error::Int) + } + async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result { + let num = self.string_wrapping.clone(); + let n = self.execute_expression_wrapping(expr, &num).await?; + let str: String = serde_json::from_str(&n)?; + Ok(str) + } + pub(crate) async fn execute_expression_to_json( + &mut self, + expr: impl AsRef<[u8]>, + ) -> Result { + let mut fexpr = b"builtins.toJSON (".to_vec(); + fexpr.extend_from_slice(expr.as_ref()); + fexpr.push(b')'); + let v = self.execute_expression_string(fexpr).await?; + Ok(serde_json::from_str(&v)?) + } + async fn execute_expression_wrapping( + &mut self, + expr: impl AsRef<[u8]>, + wrapping: &(String, String), + ) -> Result { + let mut nix_handler = self.nix_handler.clone(); + let mut collected = ErrorCollector::new(&mut nix_handler); + let res = self.execute_expression_raw(expr, &mut collected).await?; + if res.is_empty() { + collected.finish()?; + return Err(Error::ExpectedOutput); + } else { + collected.flush() + }; + let Some(res) = res.strip_prefix(&wrapping.0) else { + return Err(Error::InvalidType); + }; + let Some(res) = res.strip_suffix(&wrapping.1) else { + return Err(Error::InvalidType); + }; + Ok(res.to_owned()) + } + async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> { + let mut nix_handler = self.nix_handler.clone(); + let mut collected = ErrorCollector::new(&mut nix_handler); + let v = self.execute_expression_raw(expr, &mut collected).await?; + collected.finish()?; + if !v.is_empty() { + return Err(Error::UnexpectedOutput); + } + Ok(()) + } + pub(crate) async fn execute_expression_raw( + &mut self, + expr: impl AsRef<[u8]>, + err_handler: &mut dyn Handler, + ) -> Result { + // Prevent two commands from being executed in parallel, messing with each other. + let _lock = self.executing_command.clone(); + let _guard = _lock.lock().await; + + self.send_command(expr).await?; + // It will be echoed + self.send_command(REPL_DELIMITER).await?; + self.read_until_delimiter(err_handler).await + } + pub(crate) async fn execute_assign(&mut self, expr: impl AsRef) -> Result { + let id = self.allocate_id(); + self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref())) + .await?; + Ok(id) + } + + /// Id should be immediately used + fn allocate_id(&mut self) -> u32 { + if let Some(free) = self.free_list.pop() { + free + } else { + let v = self.next_id; + self.next_id += 1; + v + } + } + // Nix has no way to deallocate variable, yet GC will correct everything not reachable. + // async fn free_id(&mut self, id: u32) -> Result<()> { + // self.execute_expression_empty(format!("sess_field_{id} = null")) + // .await?; + // self.free_list.push(id); + // Ok(()) + // } +} --- /dev/null +++ b/crates/nix-eval/src/value.rs @@ -0,0 +1,291 @@ +use std::{collections::HashMap, fmt, path::PathBuf, sync::Arc}; + +use better_command::NixHandler; +use serde::{de::DeserializeOwned, Serialize}; + +use crate::{macros::NixExprBuilder, nix_go, Error, NixSession, Result}; + +#[derive(Clone)] +pub enum Index { + Var(String), + String(String), + #[allow(dead_code)] + Apply(String), + #[allow(dead_code)] + Expr(NixExprBuilder), + ExprApply(NixExprBuilder), + Pipe(NixExprBuilder), +} +impl Index { + pub fn var(v: impl AsRef) -> Self { + let v = v.as_ref(); + assert!( + !(v.contains('.') | v.contains(' ')), + "bad variable name: {v}" + ); + Self::Var(v.to_owned()) + } + pub fn attr(v: impl AsRef) -> Self { + Self::String(v.as_ref().to_owned()) + } + #[allow(dead_code)] + pub fn apply(v: impl Serialize) -> Self { + let serialized = nixlike::serialize(v).expect("invalid value for apply"); + Self::Apply(serialized.trim_end().to_owned()) + } +} +impl fmt::Display for Index { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Index::Var(v) => { + write!(f, "{v}") + } + Index::String(k) => { + let v = nixlike::format_identifier(k.as_str()); + write!(f, ".{v}") + } + Index::Apply(o) => { + write!(f, "({o})") + } + Index::Expr(e) => { + write!(f, "[{}]", e.out) + } + Index::ExprApply(e) => { + write!(f, "({})", e.out) + } + Index::Pipe(e) => { + write!(f, "({})", e.out) + } + } + } +} +impl fmt::Debug for Index { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{self}") + } +} +struct PathDisplay<'i>(&'i [Index]); +impl fmt::Display for PathDisplay<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for i in self.0 { + write!(f, "{i}")?; + } + Ok(()) + } +} +struct ValueInner { + full_path: Option>, + session: NixSession, + value: Option, +} +#[derive(Clone)] +pub struct Value(Arc); +impl Value { + fn root(session: NixSession) -> Self { + Self(Arc::new(ValueInner { + full_path: Some(vec![]), + session, + value: None, + })) + } + async fn new(session: NixSession, query: &str) -> Result { + let vid = session.0.lock().await.execute_assign(query).await?; + Ok(Self(Arc::new(ValueInner { + full_path: None, + session, + value: Some(vid), + }))) + } + /// Get a top-level binding. + /// + /// In flake repl session, every output is exposed as top-level binding. + pub async fn binding(session: NixSession, field: &str) -> Result { + Self::root(session).select([Index::var(field)]).await + } + pub async fn select<'a>(&self, name: impl IntoIterator) -> Result { + let mut used_fields = Vec::new(); + let mut name = name.into_iter(); + + let mut full_path = self.0.full_path.clone(); + let mut query = if let Some(id) = self.0.value { + format!("sess_field_{id}") + } else { + let first = name.next(); + if let Some(Index::Var(i)) = first { + if let Some(full_path) = &mut full_path { + full_path.push(Index::Var(i.clone())); + } + i.clone() + } else { + panic!("first path item should be variable, got {first:?}") + } + }; + for v in name { + if let Some(full_path) = &mut full_path { + full_path.push(v.clone()); + } + match v { + Index::Var(_) => panic!("var item may only be first"), + Index::String(s) => { + let escaped = + nixlike::serialize(s).expect("strings are always serialized successfully"); + query.push('.'); + query.push_str(escaped.trim()); + } + Index::Apply(a) => { + // In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()` + query = format!("({query} {a})"); + } + Index::Expr(e) => { + let index = Value::new(self.0.session.clone(), &e.out).await?; + used_fields.push(index.clone()); + query.push('.'); + let index = format!("${{sess_field_{}}}", index.0.value.expect("value")); + query.push_str(&index); + } + Index::ExprApply(e) => { + let index = Value::new(self.0.session.clone(), &e.out).await?; + used_fields.push(index.clone()); + query.push(' '); + let index = format!("sess_field_{}", index.0.value.expect("value")); + query.push_str(&index); + query = format!("({query})"); + } + Index::Pipe(v) => { + let index = Value::new(self.0.session.clone(), &v.out).await?; + used_fields.push(index.clone()); + let index = format!("sess_field_{}", index.0.value.expect("value")); + query = format!("({index} {query})"); + } + } + } + + let vid = self + .0 + .session + .0 + .lock() + .await + .execute_assign(&query) + .await + .map_err(|e| e.context(self.attribute()))?; + Ok(Self(Arc::new(ValueInner { + full_path, + session: self.0.session.clone(), + value: Some(vid), + }))) + } + pub async fn as_json(&self) -> Result { + let id = self.0.value.expect("can't serialize root field"); + let query = format!("sess_field_{id}"); + self.0 + .session + .0 + .lock() + .await + .execute_expression_to_json(&query) + .await + .map_err(|e| e.context(self.attribute())) + } + #[allow(dead_code)] + pub async fn has_field(&self, name: &str) -> Result { + let id = self.0.value.expect("can't list root fields"); + let key = nixlike::escape_string(name); + let query = format!("sess_field_{id} ? {key}"); + self.0 + .session + .0 + .lock() + .await + .execute_expression_to_json(&query) + .await + .map_err(|e| e.context(self.attribute())) + } + pub async fn list_fields(&self) -> Result> { + let id = self.0.value.expect("can't list root fields"); + let query = format!("builtins.attrNames sess_field_{id}"); + self.0 + .session + .0 + .lock() + .await + .execute_expression_to_json(&query) + .await + .map_err(|e| e.context(self.attribute())) + } + pub async fn type_of(&self) -> Result { + let id = self.0.value.expect("can't list root fields"); + let query = format!("builtins.typeOf sess_field_{id}"); + self.0 + .session + .0 + .lock() + .await + .execute_expression_to_json(&query) + .await + .map_err(|e| e.context(self.attribute())) + } + #[allow(dead_code)] + pub async fn import(&self) -> Result { + let import = Self::new(self.0.session.clone(), "import").await?; + Ok(nix_go!(self | import)) + } + pub async fn build(&self) -> Result> { + let id = self.0.value.expect("can't use build on not-value"); + let query = format!(":b sess_field_{id}"); + let vid = self + .0 + .session + .0 + .lock() + .await + .execute_expression_raw(&query, &mut NixHandler::default()) + .await?; + if vid.is_empty() { + return Err(Error::BuildFailed { + attribute: self.attribute(), + error: "build produced no output".to_owned(), + }); + } + let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n") + else { + return Err(Error::BuildFailed { + attribute: self.attribute(), + error: format!("failed to parse output: {vid}"), + }); + }; + let outputs = vid + .split('\n') + .filter(|v| !v.is_empty()) + .map(|v| v.split_once(" -> ").expect("unexpected build output")) + .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b))) + .collect(); + Ok(outputs) + } + + fn attribute(&self) -> String { + if let Some(full_path) = &self.0.full_path { + PathDisplay(full_path).to_string() + } else { + "".to_owned() + } + } + + pub(crate) fn session(&self) -> NixSession { + self.0.session.clone() + } + + pub(crate) fn session_field_id(&self) -> u32 { + self.0.value.expect("not root") + } +} +impl Drop for ValueInner { + fn drop(&mut self) { + if let Some(id) = self.value { + if let Ok(mut lock) = self.session.0.try_lock() { + lock.free_list.push(id) + } + // Leaked + } + } +} --- a/crates/nixlike/src/lib.rs +++ b/crates/nixlike/src/lib.rs @@ -1,5 +1,9 @@ //! Serialization/deserialization for nix subset usable for static configurations -//! Serialized results from this library are readable by both this library and standard nix tools +//! +//! Serialized results from this library are readable by both this library and standard nix tools. +//! Nix produced output should also be readable by this library, however, you can't write arbitrary nix +//! expressions and expect it to work, only basic primitives are supported, and there is no +//! variables/recursive records, interpolation, e.t.c. use linked_hash_map::LinkedHashMap; use peg::str::LineCol; @@ -198,9 +202,15 @@ #[test] fn parse_multiline() { + // First line is ignored, unless there is a significant characters. assert_eq!(nixlike::multiline_string("''\n''").expect("parse"), ""); + // Rest of the lines are processed normally. assert_eq!(nixlike::multiline_string("''\n\n''").expect("parse"), "\n"); + // Example with significant character on first line. assert_eq!(nixlike::multiline_string("''t\n''").expect("parse"), "t\n"); + // There might be nothing in multiline string block. assert_eq!(nixlike::multiline_string("''''").expect("parse"), ""); + // And there also might just be spaces, they are removed due to dedent, and output is empty because + // first line was also ignored due to missing significant characters. assert_eq!(nixlike::multiline_string("'' ''").expect("parse"), ""); } --- a/flake.lock +++ b/flake.lock @@ -7,11 +7,11 @@ ] }, "locked": { - "lastModified": 1715274763, - "narHash": "sha256-3Iv1PGHJn9sV3HO4FlOVaaztOxa9uGLfOmUWrH7v7+A=", + "lastModified": 1716569590, + "narHash": "sha256-5eDbq8TuXFGGO3mqJFzhUbt5zHVTf5zilQoyW5jnJwo=", "owner": "ipetkov", "repo": "crane", - "rev": "27025ab71bdca30e7ed0a16c88fd74c5970fc7f5", + "rev": "109987da061a1bf452f435f1653c47511587d919", "type": "github" }, "original": { @@ -40,11 +40,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1715619775, - "narHash": "sha256-c1XVqTH9IeUukc4LcWLzHCSpMfo4Dj4K8t/kLV3c80c=", + "lastModified": 1716658583, + "narHash": "sha256-A93mYmlLvCz0YjQiQ5Tc3DpLrP6Brs+gAlK9nlnSOVg=", "owner": "nixos", "repo": "nixpkgs", - "rev": "0cb78770f66945bb3130f762aef05373e283f2b9", + "rev": "3e280884c0b0e8222ec6b05a99db01505964e1c3", "type": "github" }, "original": { @@ -54,11 +54,28 @@ "type": "github" } }, + "nixpkgs-stable-for-tests": { + "locked": { + "lastModified": 1716361217, + "narHash": "sha256-mzZDr00WUiUXVm1ujBVv6A0qRd8okaITyUp4ezYRgc4=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "46397778ef1f73414b03ed553a3368f0e7e33c2f", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-23.11", + "repo": "nixpkgs", + "type": "github" + } + }, "root": { "inputs": { "crane": "crane", "flake-utils": "flake-utils", "nixpkgs": "nixpkgs", + "nixpkgs-stable-for-tests": "nixpkgs-stable-for-tests", "rust-overlay": "rust-overlay" } }, @@ -72,11 +89,11 @@ ] }, "locked": { - "lastModified": 1715566659, - "narHash": "sha256-OpI0TnN+uE0vvxjPStlTzf5RTohIXVSMwrP9NEgMtaY=", + "lastModified": 1716603336, + "narHash": "sha256-81u/zd7V+XRTq88zwRLxw5GnwZyEiAvGA2BvAXUe864=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "6c465248316cd31502c82f81f1a3acf2d621b01c", + "rev": "4d0f1e4d5d65c23cdbb77e4b0d91940be7309bd4", "type": "github" }, "original": { --- a/flake.nix +++ b/flake.nix @@ -3,6 +3,7 @@ inputs = { nixpkgs.url = "github:nixos/nixpkgs/master"; + nixpkgs-stable-for-tests.url = "github:nixos/nixpkgs/nixos-23.11"; rust-overlay = { url = "github:oxalica/rust-overlay"; inputs = { @@ -21,6 +22,7 @@ rust-overlay, flake-utils, nixpkgs, + nixpkgs-stable-for-tests, crane, }: with nixpkgs.lib; @@ -37,11 +39,37 @@ rust = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; craneLib = (crane.mkLib pkgs).overrideToolchain rust; in { - packages = import ./pkgs { - inherit (pkgs) callPackage; - inherit craneLib; - }; - devShell = craneLib.devShell { + packages = let + packages = import ./pkgs { + inherit (pkgs) callPackage; + inherit craneLib; + }; + in + packages // {default = packages.fleet;}; + + checks = let + packages = import ./pkgs { + inherit (pkgs) callPackage; + craneLib = crane.mkLib (import nixpkgs {inherit system;}); + }; + packages-with-nixpkgs-stable = import ./pkgs { + inherit (pkgs) callPackage; + craneLib = crane.mkLib (import nixpkgs-stable-for-tests {inherit system;}); + }; + prefixAttrs = prefix: attrs: + nixpkgs.lib.attrsets.mapAttrs' (name: value: { + name = "${prefix}${name}"; + value = value.overrideAttrs (prev: { + pname = "${prefix}${prev.pname}"; + }); + }) + attrs; + in + # `fleet` crate wants nightly rust, also little sense of supporting it on stable nixpkgs. + (prefixAttrs "nixpkgs-" (removeAttrs packages ["fleet"])) + // (prefixAttrs "nixpkgs-stable-" (removeAttrs packages-with-nixpkgs-stable ["fleet"])); + + devShells.default = craneLib.devShell { nativeBuildInputs = with pkgs; [ alejandra lld --- a/nixos/fleetPkgs.nix +++ /dev/null @@ -1,24 +0,0 @@ -{...}: { - nixpkgs.overlays = [ - # Not using craneLib here, because we don't want to have two different rust versions for some platforms. - (final: prev: { - fleet-install-secrets = prev.callPackage ({rustPlatform}: - rustPlatform.buildRustPackage rec { - pname = "fleet-install-secrets"; - name = "${pname}"; - - src = ../.; - strictDeps = true; - - buildAndTestSubdir = "cmds/install-secrets"; - - cargoLock = { - lockFile = ../Cargo.lock; - outputHashes = { - "alejandra-3.0.0" = "sha256-q2oTMen8E1YUbNyU4chPOj728/YR0RzdpN+bNjZX2QU="; - }; - }; - }) {}; - }) - ]; -} --- a/pkgs/default.nix +++ b/pkgs/default.nix @@ -1,9 +1,7 @@ { callPackage, craneLib, -}: rec { - default = fleet; - +}: { fleet-install-secrets = callPackage ./fleet-install-secrets.nix {inherit craneLib;}; fleet = callPackage ./fleet.nix {inherit craneLib;}; } --- /dev/null +++ b/pkgs/generator-helper.nix @@ -0,0 +1,14 @@ + +{craneLib}: +craneLib.buildPackage rec { + pname = "fleet-generator-helper"; + + src = craneLib.cleanCargoSource (craneLib.path ../.); + strictDeps = true; + + cargoExtraArgs = "--locked -p ${pname}"; + + postInstall = '' + mv bin/${pname} bin/genhelper + ''; +} --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "nightly-2024-02-10" +channel = "nightly-2024-05-01" components = ["rustfmt", "clippy", "rust-analyzer", "rust-src"] -- gitstuff