difftreelog
refactor more repl abstractions
in: trunk
11 files changed
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::ffi::{OsStr, OsString};3use std::fmt::{self, Display};4use std::path::PathBuf;5use std::process::Stdio;6use std::sync::{Arc, OnceLock};78use anyhow::{anyhow, bail, ensure, Context, Result};9use futures::StreamExt;10use itertools::Itertools;11use r2d2::{Pool, PooledConnection};12use serde::de::DeserializeOwned;13use serde::{Deserialize, Serialize};14use tokio::io::AsyncWriteExt;15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};16use tokio::select;17use tokio::sync::{mpsc, oneshot};18use tokio_util::codec::{FramedRead, LinesCodec};19use tracing::{debug, error, warn, Level};2021use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};2223const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2425pub struct NixSessionInner {26 full_delimiter: String,27 nix_handler: ClonableHandler<NixHandler>,28 out: OutputHandler,29 stdin: ChildStdin,30 string_wrapping: (String, String),31 number_wrapping: (String, String),3233 next_id: u32,34 free_list: Vec<u32>,35}36const TRAIN_STRING: &str = "\"TRAIN_STRING\"";37const TRAIN_NUMBER: &str = "13141516";3839#[must_use]40struct ErrorCollector<'i, H> {41 collected: Vec<String>,42 inner: &'i mut H,43}44impl<'i, H> ErrorCollector<'i, H> {45 fn new(inner: &'i mut H) -> Self {46 Self {47 collected: vec![],48 inner,49 }50 }51}52impl<H> ErrorCollector<'_, H> {53 fn handle_line_inner(&mut self, msg: &str) -> bool {54 let Some(msg) = msg.strip_prefix("@nix ") else {55 return false;56 };57 #[derive(Deserialize)]58 struct ErrorAction {59 action: String,60 level: u32,61 msg: String,62 }63 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {64 return false;65 };66 if act.action != "msg" || act.level != 0 {67 return false;68 }69 self.collected.push(act.msg);70 true71 }72 fn finish(self) -> Result<()> {73 // fn dedent(s: String) -> String {74 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)75 // }76 if !self.collected.is_empty() {77 bail!(78 "{}",79 self.collected80 .iter()81 .map(|v| {82 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {83 let v = unindent::unindent(f.trim_start());84 v.trim().to_owned()85 } else {86 v.to_owned()87 }88 })89 .join("\n")90 );91 }92 Ok(())93 }94 fn flush(self) {95 for line in self.collected {96 warn!("{line}");97 }98 }99}100impl<H: Handler> Handler for ErrorCollector<'_, H> {101 fn handle_line(&mut self, e: &str) {102 if self.handle_line_inner(e) {103 return;104 }105 self.inner.handle_line(e)106 }107}108109enum OutputLine {110 Out(String),111 Err(String),112}113struct OutputHandler {114 rx: mpsc::Receiver<OutputLine>,115 _cancel_handle: oneshot::Receiver<()>,116}117impl OutputHandler {118 fn new(out: ChildStdout, err: ChildStderr) -> Self {119 let mut out = FramedRead::new(out, LinesCodec::new());120 let mut err = FramedRead::new(err, LinesCodec::new());121 let (tx, rx) = mpsc::channel(20);122 let (mut cancelled, _cancel_handle) = oneshot::channel();123 tokio::spawn(async move {124 loop {125 select! {126 // We should receive errors earlier than synchronization127 biased;128 e = err.next() => {129 let Some(Ok(e)) = e else {130 if e.is_some() {131 error!("bad repl stderr: {e:?}");132 }133 continue;134 };135 let _ = tx.send(OutputLine::Err(e)).await;136 }137 o = out.next() => {138 let Some(Ok(o)) = o else {139 if o.is_some() {140 error!("bad repl stdout: {o:?}");141 }142 continue;143 };144 let _ = tx.send(OutputLine::Out(o)).await;145 }146 // Reader doesn't care about stdout, as this is cancelled.147 // Error still might be useful, to process leftover span closures?148 _ = cancelled.closed() => {149 break;150 }151 }152 }153 });154 Self { rx, _cancel_handle }155 }156 async fn next(&mut self) -> Option<OutputLine> {157 self.rx.recv().await158 }159}160161struct WarnHandler;162impl Handler for WarnHandler {163 fn handle_line(&mut self, e: &str) {164 warn!(target: "nix", "{e}")165 }166}167168impl NixSessionInner {169 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {170 let mut cmd = Command::new("nix");171 cmd.arg("repl")172 .arg(flake)173 .arg("--log-format")174 .arg("internal-json");175 for arg in extra_args {176 cmd.arg(arg);177 }178 cmd.stdin(Stdio::piped());179 cmd.stdout(Stdio::piped());180 cmd.stderr(Stdio::piped());181 let cmd = cmd.spawn()?;182 let stdout = cmd.stdout.unwrap();183 let stderr = cmd.stderr.unwrap();184 let mut out = OutputHandler::new(stdout, stderr);185 let mut stdin = cmd.stdin.unwrap();186 // Standard repl hello doesn't work with internal-json logger187 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;188 stdin.write_all(b"\n").await?;189 stdin.flush().await?;190 let nix_handler = NixHandler::default();191 let mut full_delimiter = None;192 let mut errors = vec![];193 while let Some(line) = out.next().await {194 let line = match line {195 OutputLine::Out(o) => o,196 OutputLine::Err(_e) => {197 // Handle startup errors, but skip repl hello?198 errors.push(_e);199 continue;200 }201 };202 if line.contains(REPL_DELIMITER) {203 debug!("discovered repl delimiter with added colors: {line}");204 full_delimiter = Some(line.to_owned());205 break;206 }207 }208 let Some(full_delimiter) = full_delimiter else {209 for e in errors {210 error!("{e}");211 }212 bail!("failed to discover delimiter");213 };214 let mut res = Self {215 full_delimiter,216 nix_handler: ClonableHandler::new(nix_handler),217 out,218 stdin,219 string_wrapping: Default::default(),220 number_wrapping: Default::default(),221222 next_id: 0,223 free_list: vec![],224 };225 res.train().await?;226 Ok(res)227 }228 async fn train(&mut self) -> Result<()> {229 {230 let full_string = self231 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)232 .await?;233 let string_offset = full_string.find(TRAIN_STRING).expect("contained");234 let string_prefix = &full_string[..string_offset];235 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];236 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());237 }238 {239 let full_number = self240 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)241 .await?;242 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");243 let number_prefix = &full_number[..number_offset];244 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];245 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());246 }247 Ok(())248 }249 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {250 if tracing::enabled!(Level::DEBUG) {251 let cmd_str = String::from_utf8_lossy(cmd.as_ref());252 tracing::debug!("{cmd_str}");253 };254 self.stdin.write_all(cmd.as_ref()).await?;255 self.stdin.write_all(b"\n").await?;256 Ok(())257 }258 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {259 let mut out = String::new();260 while let Some(line) = self.out.next().await {261 let line = match line {262 OutputLine::Out(out) => out,263 OutputLine::Err(err) => {264 err_handler.handle_line(&err);265 continue;266 }267 };268 if line == self.full_delimiter {269 return Ok(out);270 }271 if !out.is_empty() {272 out.push('\n');273 }274 out.push_str(&line);275 }276 bail!("didn't reached delimiter");277 }278 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {279 let num = self.number_wrapping.clone();280 let n = self.execute_expression_wrapping(expr, &num).await?;281 Ok(n.parse::<u64>()?)282 }283 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {284 let num = self.string_wrapping.clone();285 let n = self.execute_expression_wrapping(expr, &num).await?;286 let str: String = serde_json::from_str(&n)?;287 Ok(str)288 }289 async fn execute_expression_to_json<V: DeserializeOwned>(290 &mut self,291 expr: impl AsRef<[u8]>,292 ) -> Result<V> {293 let mut fexpr = b"builtins.toJSON (".to_vec();294 fexpr.extend_from_slice(expr.as_ref());295 fexpr.push(b')');296 let v = self.execute_expression_string(fexpr).await?;297 Ok(serde_json::from_str(&v)?)298 }299 async fn execute_expression_wrapping(300 &mut self,301 expr: impl AsRef<[u8]>,302 wrapping: &(String, String),303 ) -> Result<String> {304 let mut nix_handler = self.nix_handler.clone();305 let mut collected = ErrorCollector::new(&mut nix_handler);306 let res = self.execute_expression_raw(expr, &mut collected).await?;307 if res.is_empty() {308 collected.finish()?;309 bail!("expected expression, got nothing")310 } else {311 collected.flush()312 };313 let Some(res) = res.strip_prefix(&wrapping.0) else {314 bail!("invalid type")315 };316 let Some(res) = res.strip_suffix(&wrapping.1) else {317 bail!("invalid type")318 };319 Ok(res.to_owned())320 }321 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {322 let mut nix_handler = self.nix_handler.clone();323 let mut collected = ErrorCollector::new(&mut nix_handler);324 let v = self.execute_expression_raw(expr, &mut collected).await?;325 collected.finish()?;326 ensure!(v.is_empty(), "unexpected expression result");327 Ok(())328 }329 async fn execute_expression_raw(330 &mut self,331 expr: impl AsRef<[u8]>,332 err_handler: &mut dyn Handler,333 ) -> Result<String> {334 self.send_command(expr).await?;335 // It will be echoed336 self.send_command(REPL_DELIMITER).await?;337 self.read_until_delimiter(err_handler).await338 }339 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {340 let id = self.allocate_id();341 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))342 .await?;343 Ok(id)344 }345346 /// Id should be immediately used347 fn allocate_id(&mut self) -> u32 {348 if let Some(free) = self.free_list.pop() {349 free350 } else {351 let v = self.next_id;352 self.next_id += 1;353 v354 }355 }356 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.357 // async fn free_id(&mut self, id: u32) -> Result<()> {358 // self.execute_expression_empty(format!("sess_field_{id} = null"))359 // .await?;360 // self.free_list.push(id);361 // Ok(())362 // }363}364365#[derive(Clone)]366pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);367368#[macro_export]369macro_rules! nix_path {370 (@o($o:ident) $var:ident $($tt:tt)*) => {{371 $o.push(Index::var(stringify!($var)));372 nix_path!(@o($o) $($tt)*);373 }};374 (@o($o:ident) . $var:ident $($tt:tt)*) => {{375 $o.push(Index::attr(stringify!($var)));376 nix_path!(@o($o) $($tt)*);377 }};378 (@o($o:ident) . $var:literal $($tt:tt)*) => {{379 $o.push(Index::attr($var));380 nix_path!(@o($o) $($tt)*);381 }};382 (@o($o:ident) . { $var:expr } $($tt:tt)*) => {{383 $o.push(Index::attr($var));384 nix_path!(@o($o) $($tt)*);385 }};386 (@o($o:ident) [ $var:literal ] $($tt:tt)*) => {{387 $o.push(Index::idx($var));388 nix_path!(@o($o) $($tt)*);389 }};390 (@o($o:ident) ($e:expr) $($tt:tt)*) => {391 $o.push(Index::apply($e));392 nix_path!(@o($o) $($tt)*);393 };394 (@o($o:ident)) => {};395 ($($tt:tt)+) => {{396 use $crate::{nix_path, better_nix_eval::Index};397 let mut out = vec![];398 nix_path!(@o(out) $($tt)*);399 out400 }}401}402403#[derive(Clone)]404pub enum Index {405 Var(String),406 String(String),407 Apply(String),408 Idx(u32),409}410impl Index {411 pub fn var(v: impl AsRef<str>) -> Self {412 let v = v.as_ref();413 assert!(414 !(v.contains('.') | v.contains(' ')),415 "bad variable name: {v}"416 );417 Self::Var(v.to_owned())418 }419 pub fn attr(v: impl AsRef<str>) -> Self {420 Self::String(v.as_ref().to_owned())421 }422 pub fn idx(v: u32) -> Self {423 Self::Idx(v)424 }425 pub fn apply(v: impl Serialize) -> Self {426 let serialized = nixlike::serialize(v).expect("invalid value for apply");427 Self::Apply(serialized.trim_end().to_owned())428 }429}430impl Display for Index {431 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {432 match self {433 Index::Var(v) => {434 write!(f, "{v}")435 }436 Index::String(k) => {437 let v = nixlike::format_identifier(k.as_str());438 write!(f, ".{v}")439 }440 Index::Apply(o) => {441 write!(f, "<apply>({o})")442 }443 Index::Idx(i) => {444 write!(f, "[{i}]")445 }446 }447 }448}449impl fmt::Debug for Index {450 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {451 write!(f, "{self}")452 }453}454struct PathDisplay<'i>(&'i [Index]);455impl Display for PathDisplay<'_> {456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {457 for i in self.0 {458 write!(f, "{i}")?;459 }460 Ok(())461 }462}463pub struct Field {464 full_path: Vec<Index>,465 session: NixSession,466 value: Option<u32>,467}468impl Field {469 fn root(session: NixSession) -> Self {470 Self {471 full_path: vec![],472 session,473 value: None,474 }475 }476 pub async fn field(session: NixSession, field: &str) -> Result<Self> {477 Self::root(session)478 .select([Index::var(field)])479 .await480 }481 pub async fn get_json_deep<'a, V: DeserializeOwned>(482 &self,483 name: impl IntoIterator<Item = Index>,484 ) -> Result<V> {485 let field = self.select(name).await?;486 field.as_json().await487 }488 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {489 let mut name = name.into_iter();490491 let mut full_path = self.full_path.clone();492 let mut query = if let Some(id) = self.value {493 format!("sess_field_{id}")494 } else {495 let first = name.next();496 if let Some(Index::Var(i)) = first {497 full_path.push(Index::Var(i.clone()));498 i.clone()499 } else {500 panic!("first path item should be variable, got {first:?}")501 }502 };503 for v in name {504 full_path.push(v.clone());505 match v {506 Index::Var(_) => panic!("var item may only be first"),507 Index::String(s) => {508 let escaped = nixlike::serialize(s)?;509 query.push('.');510 query.push_str(escaped.trim());511 }512 Index::Apply(a) => {513 // In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`514 query = format!("({query} {a})");515 }516 Index::Idx(idx) => {517 query = format!("builtins.elemAt ({query}) {idx}");518 }519 }520 }521522 let vid = self523 .session524 .0525 .lock()526 .await527 .execute_assign(&query)528 .await529 .with_context(|| format!("full path: {}", PathDisplay(&full_path)))?;530 Ok(Self {531 full_path,532 session: self.session.clone(),533 value: Some(vid),534 })535 }536 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {537 let id = self.value.expect("can't serialize root field");538 self.session539 .0540 .lock()541 .await542 .execute_expression_to_json(&format!("sess_field_{id}"))543 .await544 .with_context(|| format!("full path: {}", PathDisplay(&self.full_path)))545 }546 pub async fn list_fields(&self) -> Result<Vec<String>> {547 let id = self.value.expect("can't list root fields");548 self.session549 .0550 .lock()551 .await552 .execute_expression_to_json(&format!("builtins.attrNames sess_field_{id}"))553 .await554 .with_context(|| format!("full path: {}", PathDisplay(&self.full_path)))555 }556 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {557 let id = self.value.expect("can't use build on not-value");558 let vid = self559 .session560 .0561 .lock()562 .await563 .execute_expression_raw(&format!(":b sess_field_{id}"), &mut NixHandler::default())564 .await?;565 ensure!(!vid.is_empty(), "build failed: {}", PathDisplay(&self.full_path));566 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")567 else {568 panic!("unexpected build output: {vid:?}");569 };570 let outputs = vid571 .split('\n')572 .filter(|v| !v.is_empty())573 .map(|v| v.split_once(" -> ").expect("unexpected build output"))574 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))575 .collect();576 Ok(outputs)577 }578}579impl Drop for Field {580 fn drop(&mut self) {581 if let Some(id) = self.value {582 if let Ok(mut lock) = self.session.0.try_lock() {583 lock.free_list.push(id)584 }585 // Leaked586 }587 }588}589struct NixSessionPoolInner {590 flake: OsString,591 nix_args: Vec<OsString>,592}593594#[derive(Debug)]595pub struct NixPoolError(anyhow::Error);596impl From<anyhow::Error> for NixPoolError {597 fn from(value: anyhow::Error) -> Self {598 Self(value)599 }600}601impl std::error::Error for NixPoolError {}602impl std::fmt::Display for NixPoolError {603 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {604 self.0.fmt(f)605 }606}607impl r2d2::ManageConnection for NixSessionPoolInner {608 type Connection = NixSessionInner;609 type Error = NixPoolError;610 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {611 let _v = TOKIO_RUNTIME612 .get()613 .expect("missed tokio runtime init!")614 .enter();615 Ok(futures::executor::block_on(NixSessionInner::new(616 self.flake.as_os_str(),617 self.nix_args.iter().map(OsString::as_os_str),618 ))?)619 }620621 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {622 let _v = TOKIO_RUNTIME623 .get()624 .expect("missed tokio runtime init!")625 .enter();626 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;627 if res != 4 {628 return Err(anyhow!("sanity check failed").into());629 };630 Ok(())631 }632633 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {634 false635 }636}637pub struct NixSessionPool(Pool<NixSessionPoolInner>);638impl NixSessionPool {639 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {640 let inner = tokio::task::block_in_place(|| {641 r2d2::Builder::<NixSessionPoolInner>::new()642 .min_idle(Some(0))643 .build(NixSessionPoolInner { flake, nix_args })644 })?;645 Ok(Self(inner))646 }647 pub async fn get(&self) -> Result<NixSession> {648 let v = tokio::task::block_in_place(|| self.0.get())?;649 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))650 }651}652653pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();1use std::collections::HashMap;2use std::ffi::{OsStr, OsString};3use std::fmt::{self, Display};4use std::path::PathBuf;5use std::process::Stdio;6use std::sync::{Arc, OnceLock};78use anyhow::{anyhow, bail, ensure, Context, Result};9use futures::StreamExt;10use itertools::Itertools;11use r2d2::{Pool, PooledConnection};12use serde::de::DeserializeOwned;13use serde::{Deserialize, Serialize};14use tokio::io::AsyncWriteExt;15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};16use tokio::select;17use tokio::sync::{mpsc, oneshot};18use tokio_util::codec::{FramedRead, LinesCodec};19use tracing::{debug, error, warn, Level};2021use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};2223const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2425pub struct NixSessionInner {26 full_delimiter: String,27 nix_handler: ClonableHandler<NixHandler>,28 out: OutputHandler,29 stdin: ChildStdin,30 string_wrapping: (String, String),31 number_wrapping: (String, String),3233 next_id: u32,34 free_list: Vec<u32>,35}36const TRAIN_STRING: &str = "\"TRAIN_STRING\"";37const TRAIN_NUMBER: &str = "13141516";3839#[must_use]40struct ErrorCollector<'i, H> {41 collected: Vec<String>,42 inner: &'i mut H,43}44impl<'i, H> ErrorCollector<'i, H> {45 fn new(inner: &'i mut H) -> Self {46 Self {47 collected: vec![],48 inner,49 }50 }51}52impl<H> ErrorCollector<'_, H> {53 fn handle_line_inner(&mut self, msg: &str) -> bool {54 let Some(msg) = msg.strip_prefix("@nix ") else {55 return false;56 };57 #[derive(Deserialize)]58 struct ErrorAction {59 action: String,60 level: u32,61 msg: String,62 }63 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {64 return false;65 };66 if act.action != "msg" || act.level != 0 {67 return false;68 }69 self.collected.push(act.msg);70 true71 }72 fn finish(self) -> Result<()> {73 // fn dedent(s: String) -> String {74 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)75 // }76 if !self.collected.is_empty() {77 bail!(78 "{}",79 self.collected80 .iter()81 .map(|v| {82 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {83 let v = unindent::unindent(f.trim_start());84 v.trim().to_owned()85 } else {86 v.to_owned()87 }88 })89 .join("\n")90 );91 }92 Ok(())93 }94 fn flush(self) {95 for line in self.collected {96 warn!("{line}");97 }98 }99}100impl<H: Handler> Handler for ErrorCollector<'_, H> {101 fn handle_line(&mut self, e: &str) {102 if self.handle_line_inner(e) {103 return;104 }105 self.inner.handle_line(e)106 }107}108109enum OutputLine {110 Out(String),111 Err(String),112}113struct OutputHandler {114 rx: mpsc::Receiver<OutputLine>,115 _cancel_handle: oneshot::Receiver<()>,116}117impl OutputHandler {118 fn new(out: ChildStdout, err: ChildStderr) -> Self {119 let mut out = FramedRead::new(out, LinesCodec::new());120 let mut err = FramedRead::new(err, LinesCodec::new());121 let (tx, rx) = mpsc::channel(20);122 let (mut cancelled, _cancel_handle) = oneshot::channel();123 tokio::spawn(async move {124 loop {125 select! {126 // We should receive errors earlier than synchronization127 biased;128 e = err.next() => {129 let Some(Ok(e)) = e else {130 if e.is_some() {131 error!("bad repl stderr: {e:?}");132 }133 continue;134 };135 let _ = tx.send(OutputLine::Err(e)).await;136 }137 o = out.next() => {138 let Some(Ok(o)) = o else {139 if o.is_some() {140 error!("bad repl stdout: {o:?}");141 }142 continue;143 };144 let _ = tx.send(OutputLine::Out(o)).await;145 }146 // Reader doesn't care about stdout, as this is cancelled.147 // Error still might be useful, to process leftover span closures?148 _ = cancelled.closed() => {149 break;150 }151 }152 }153 });154 Self { rx, _cancel_handle }155 }156 async fn next(&mut self) -> Option<OutputLine> {157 self.rx.recv().await158 }159}160161struct WarnHandler;162impl Handler for WarnHandler {163 fn handle_line(&mut self, e: &str) {164 warn!(target: "nix", "{e}")165 }166}167168impl NixSessionInner {169 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {170 let mut cmd = Command::new("nix");171 cmd.arg("repl")172 .arg(flake)173 .arg("--log-format")174 .arg("internal-json");175 for arg in extra_args {176 cmd.arg(arg);177 }178 cmd.stdin(Stdio::piped());179 cmd.stdout(Stdio::piped());180 cmd.stderr(Stdio::piped());181 let cmd = cmd.spawn()?;182 let stdout = cmd.stdout.unwrap();183 let stderr = cmd.stderr.unwrap();184 let mut out = OutputHandler::new(stdout, stderr);185 let mut stdin = cmd.stdin.unwrap();186 // Standard repl hello doesn't work with internal-json logger187 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;188 stdin.write_all(b"\n").await?;189 stdin.flush().await?;190 let nix_handler = NixHandler::default();191 let mut full_delimiter = None;192 let mut errors = vec![];193 while let Some(line) = out.next().await {194 let line = match line {195 OutputLine::Out(o) => o,196 OutputLine::Err(_e) => {197 // Handle startup errors, but skip repl hello?198 errors.push(_e);199 continue;200 }201 };202 if line.contains(REPL_DELIMITER) {203 debug!("discovered repl delimiter with added colors: {line}");204 full_delimiter = Some(line.to_owned());205 break;206 }207 }208 let Some(full_delimiter) = full_delimiter else {209 for e in errors {210 error!("{e}");211 }212 bail!("failed to discover delimiter");213 };214 let mut res = Self {215 full_delimiter,216 nix_handler: ClonableHandler::new(nix_handler),217 out,218 stdin,219 string_wrapping: Default::default(),220 number_wrapping: Default::default(),221222 next_id: 0,223 free_list: vec![],224 };225 res.train().await?;226 Ok(res)227 }228 async fn train(&mut self) -> Result<()> {229 {230 let full_string = self231 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)232 .await?;233 let string_offset = full_string.find(TRAIN_STRING).expect("contained");234 let string_prefix = &full_string[..string_offset];235 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];236 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());237 }238 {239 let full_number = self240 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)241 .await?;242 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");243 let number_prefix = &full_number[..number_offset];244 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];245 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());246 }247 Ok(())248 }249 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {250 if tracing::enabled!(Level::DEBUG) {251 let cmd_str = String::from_utf8_lossy(cmd.as_ref());252 tracing::debug!("{cmd_str}");253 };254 self.stdin.write_all(cmd.as_ref()).await?;255 self.stdin.write_all(b"\n").await?;256 Ok(())257 }258 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {259 let mut out = String::new();260 while let Some(line) = self.out.next().await {261 let line = match line {262 OutputLine::Out(out) => out,263 OutputLine::Err(err) => {264 err_handler.handle_line(&err);265 continue;266 }267 };268 if line == self.full_delimiter {269 return Ok(out);270 }271 if !out.is_empty() {272 out.push('\n');273 }274 out.push_str(&line);275 }276 bail!("didn't reached delimiter");277 }278 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {279 let num = self.number_wrapping.clone();280 let n = self.execute_expression_wrapping(expr, &num).await?;281 Ok(n.parse::<u64>()?)282 }283 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {284 let num = self.string_wrapping.clone();285 let n = self.execute_expression_wrapping(expr, &num).await?;286 let str: String = serde_json::from_str(&n)?;287 Ok(str)288 }289 async fn execute_expression_to_json<V: DeserializeOwned>(290 &mut self,291 expr: impl AsRef<[u8]>,292 ) -> Result<V> {293 let mut fexpr = b"builtins.toJSON (".to_vec();294 fexpr.extend_from_slice(expr.as_ref());295 fexpr.push(b')');296 let v = self.execute_expression_string(fexpr).await?;297 Ok(serde_json::from_str(&v)?)298 }299 async fn execute_expression_wrapping(300 &mut self,301 expr: impl AsRef<[u8]>,302 wrapping: &(String, String),303 ) -> Result<String> {304 let mut nix_handler = self.nix_handler.clone();305 let mut collected = ErrorCollector::new(&mut nix_handler);306 let res = self.execute_expression_raw(expr, &mut collected).await?;307 if res.is_empty() {308 collected.finish()?;309 bail!("expected expression, got nothing")310 } else {311 collected.flush()312 };313 let Some(res) = res.strip_prefix(&wrapping.0) else {314 bail!("invalid type")315 };316 let Some(res) = res.strip_suffix(&wrapping.1) else {317 bail!("invalid type")318 };319 Ok(res.to_owned())320 }321 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {322 let mut nix_handler = self.nix_handler.clone();323 let mut collected = ErrorCollector::new(&mut nix_handler);324 let v = self.execute_expression_raw(expr, &mut collected).await?;325 collected.finish()?;326 ensure!(v.is_empty(), "unexpected expression result");327 Ok(())328 }329 async fn execute_expression_raw(330 &mut self,331 expr: impl AsRef<[u8]>,332 err_handler: &mut dyn Handler,333 ) -> Result<String> {334 self.send_command(expr).await?;335 // It will be echoed336 self.send_command(REPL_DELIMITER).await?;337 self.read_until_delimiter(err_handler).await338 }339 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {340 let id = self.allocate_id();341 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))342 .await?;343 Ok(id)344 }345346 /// Id should be immediately used347 fn allocate_id(&mut self) -> u32 {348 if let Some(free) = self.free_list.pop() {349 free350 } else {351 let v = self.next_id;352 self.next_id += 1;353 v354 }355 }356 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.357 // async fn free_id(&mut self, id: u32) -> Result<()> {358 // self.execute_expression_empty(format!("sess_field_{id} = null"))359 // .await?;360 // self.free_list.push(id);361 // Ok(())362 // }363}364365#[derive(Clone)]366pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);367368#[derive(Clone)]369pub struct NixExprBuilder {370 out: String,371 used_fields: Vec<Field>,372}373impl NixExprBuilder {374 pub fn object() -> Self {375 NixExprBuilder {376 out: "{ ".to_owned(),377 used_fields: Vec::new(),378 }379 }380 pub fn string(s: &str) -> Self {381 NixExprBuilder {382 out: nixlike::serialize(s)383 .expect("no problems with serializing_string")384 .trim_end()385 .to_owned(),386 used_fields: Vec::new(),387 }388 }389 pub fn serialized(v: impl Serialize) -> Self {390 let serialized = nixlike::serialize(v).expect("invalid value for apply");391 Self {392 out: serialized.trim_end().to_owned(),393 used_fields: Vec::new(),394 }395 }396 pub fn field(f: Field) -> Self {397 Self {398 out: format!("sess_field_{}", f.0.value.expect("no value")),399 used_fields: vec![f],400 }401 }402 pub fn end_obj(&mut self) {403 self.out.push('}');404 }405 pub fn obj_key(&mut self, name: Self, value: Self) {406 self.out.push_str(r#""${"#);407 self.extend(name);408 self.out.push_str(r#"}" = "#);409 self.extend(value);410 self.out.push_str("; ");411 }412413 pub fn extend(&mut self, e: Self) {414 self.out.push_str(&e.out);415 self.used_fields.extend(e.used_fields);416 }417418 pub fn session(&self) -> NixSession {419 let mut session = None;420 for ele in &self.used_fields {421 if session.is_none() {422 session = Some(ele.0.session.clone());423 continue;424 }425 let session = &session.as_ref().expect("checked").0;426 let ele_sess = &ele.0.session.0;427 assert!(428 Arc::ptr_eq(session, ele_sess),429 "can't mix fields from different session"430 );431 }432 session.expect("expr without fields used")433 }434 pub fn index_attr(&mut self, s: &str) {435 let escaped = nixlike::serialize(s).expect("string");436 self.out.push('.');437 self.out.push_str(escaped.trim_end());438 }439}440441#[macro_export]442macro_rules! nix_expr_inner {443 (Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{444 use $crate::better_nix_eval::NixExprBuilder;445 let mut out = NixExprBuilder::object();446 $(447 out.obj_key(448 NixExprBuilder::string(stringify!($ident)),449 $crate::nix_expr_inner!($($val)+),450 );451 )*452 out.end_obj();453 out454 }};455 (@field($o:ident) . $var:ident $($tt:tt)*) => {{456 $o.index_attr(stringify!($var));457 nix_expr_inner!(@field($o) $($tt)*);458 }};459 (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{460 $o.push(Index::attr(&$v));461 nix_expr_inner!(@o($o) $($tt)*);462 }};463 (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{464 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));465 nix_expr_inner!(@o($o) $($tt)*);466 }};467 (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {468 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));469 nix_expr_inner!(@o($o) $($tt)*);470 };471 (@field($o:ident)) => {};472 ($field:ident $($tt:tt)*) => {{473 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};474 #[allow(unused_mut, reason = "might be used if indexed")]475 let mut out = NixExprBuilder::field($field);476 nix_expr_inner!(@field(out) $($tt)*);477 out478 }};479 ($v:literal) => {{480 use $crate::better_nix_eval::NixExprBuilder;481 NixExprBuilder::string($v)482 }};483 ({$v:expr}) => {{484 use $crate::better_nix_eval::NixExprBuilder;485 NixExprBuilder::serialized(&$v)486 }}487}488#[macro_export]489macro_rules! nix_expr {490 ($($tt:tt)+) => {{491 use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};492 let expr = nix_expr_inner!($($tt)+);493 Field::new(expr.session(), expr.out)494 }};495}496497#[macro_export]498macro_rules! nix_go {499 (@o($o:ident) . $var:ident $($tt:tt)*) => {{500 $o.push(Index::attr(stringify!($var)));501 nix_go!(@o($o) $($tt)*);502 }};503 (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{504 $o.push(Index::attr(&$v));505 nix_go!(@o($o) $($tt)*);506 }};507 (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{508 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));509 nix_go!(@o($o) $($tt)*);510 }};511 (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {512 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));513 nix_go!(@o($o) $($tt)*);514 };515 (@o($o:ident)) => {};516 ($field:ident $($tt:tt)+) => {{517 use $crate::{nix_go, better_nix_eval::Index};518 let field = $field.clone();519 let mut out = vec![];520 nix_go!(@o(out) $($tt)*);521 field.select(out).await?522 }}523}524#[macro_export]525macro_rules! nix_go_json {526 ($($tt:tt)*) => {{527 $crate::nix_go!($($tt)*).as_json().await?528 }};529}530531#[derive(Clone)]532pub enum Index {533 Var(String),534 String(String),535 Apply(String),536 Expr(NixExprBuilder),537 ExprApply(NixExprBuilder),538}539impl Index {540 pub fn var(v: impl AsRef<str>) -> Self {541 let v = v.as_ref();542 assert!(543 !(v.contains('.') | v.contains(' ')),544 "bad variable name: {v}"545 );546 Self::Var(v.to_owned())547 }548 pub fn attr(v: impl AsRef<str>) -> Self {549 Self::String(v.as_ref().to_owned())550 }551 pub fn apply(v: impl Serialize) -> Self {552 let serialized = nixlike::serialize(v).expect("invalid value for apply");553 Self::Apply(serialized.trim_end().to_owned())554 }555}556impl Display for Index {557 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {558 match self {559 Index::Var(v) => {560 write!(f, "{v}")561 }562 Index::String(k) => {563 let v = nixlike::format_identifier(k.as_str());564 write!(f, ".{v}")565 }566 Index::Apply(o) => {567 write!(f, "<apply>({o})")568 }569 Index::Expr(e) => {570 write!(f, "[{}]", e.out)571 }572 Index::ExprApply(e) => {573 write!(f, "<apply>({})", e.out)574 }575 }576 }577}578impl fmt::Debug for Index {579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {580 write!(f, "{self}")581 }582}583struct PathDisplay<'i>(&'i [Index]);584impl Display for PathDisplay<'_> {585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {586 for i in self.0 {587 write!(f, "{i}")?;588 }589 Ok(())590 }591}592struct FieldInner {593 full_path: Option<Vec<Index>>,594 session: NixSession,595 value: Option<u32>,596}597fn context(full_path: Option<&[Index]>, query: &str) -> String {598 if let Some(full_path) = &full_path {599 format!("full path: {}", PathDisplay(full_path))600 } else {601 format!("query: {query:?}")602 }603}604#[derive(Clone)]605pub struct Field(Arc<FieldInner>);606impl Field {607 fn root(session: NixSession) -> Self {608 Self(Arc::new(FieldInner {609 full_path: Some(vec![]),610 session,611 value: None,612 }))613 }614 async fn new(session: NixSession, query: &str) -> Result<Self> {615 let vid = session616 .0617 .lock()618 .await619 .execute_assign(query)620 .await621 .with_context(|| context(None, query))?;622 Ok(Self(Arc::new(FieldInner {623 full_path: None,624 session,625 value: Some(vid),626 })))627 }628 pub async fn field(session: NixSession, field: &str) -> Result<Self> {629 Self::root(session).select([Index::var(field)]).await630 }631 pub async fn get_json_deep<'a, V: DeserializeOwned>(632 &self,633 name: impl IntoIterator<Item = Index>,634 ) -> Result<V> {635 let field = self.select(name).await?;636 field.as_json().await637 }638 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {639 let mut used_fields = Vec::new();640 let mut name = name.into_iter();641642 let mut full_path = self.0.full_path.clone();643 let mut query = if let Some(id) = self.0.value {644 format!("sess_field_{id}")645 } else {646 let first = name.next();647 if let Some(Index::Var(i)) = first {648 if let Some(full_path) = &mut full_path {649 full_path.push(Index::Var(i.clone()));650 }651 i.clone()652 } else {653 panic!("first path item should be variable, got {first:?}")654 }655 };656 for v in name {657 if let Some(full_path) = &mut full_path {658 full_path.push(v.clone());659 }660 match v {661 Index::Var(_) => panic!("var item may only be first"),662 Index::String(s) => {663 let escaped = nixlike::serialize(s)?;664 query.push('.');665 query.push_str(escaped.trim());666 }667 Index::Apply(a) => {668 // In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`669 query = format!("({query} {a})");670 }671 Index::Expr(e) => {672 let index = Field::new(self.0.session.clone(), &e.out).await?;673 used_fields.push(index.clone());674 query.push('.');675 let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));676 query.push_str(&index);677 }678 Index::ExprApply(e) => {679 let index = Field::new(self.0.session.clone(), &e.out).await?;680 used_fields.push(index.clone());681 query.push(' ');682 let index = format!("sess_field_{}", index.0.value.expect("value"));683 query.push_str(&index);684 query = format!("({query})");685 }686 }687 }688689 let vid = self690 .0691 .session692 .0693 .lock()694 .await695 .execute_assign(&query)696 .await697 .with_context(|| {698 if let Some(full_path) = &full_path {699 format!("full path: {}", PathDisplay(full_path))700 } else {701 format!("query: {query:?}")702 }703 })?;704 Ok(Self(Arc::new(FieldInner {705 full_path,706 session: self.0.session.clone(),707 value: Some(vid),708 })))709 }710 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {711 let id = self.0.value.expect("can't serialize root field");712 let query = format!("sess_field_{id}");713 self.0714 .session715 .0716 .lock()717 .await718 .execute_expression_to_json(&query)719 .await720 .with_context(|| context(self.0.full_path.as_deref(), &query))721 }722 pub async fn list_fields(&self) -> Result<Vec<String>> {723 let id = self.0.value.expect("can't list root fields");724 let query = format!("builtins.attrNames sess_field_{id}");725 self.0726 .session727 .0728 .lock()729 .await730 .execute_expression_to_json(&query)731 .await732 .with_context(|| context(self.0.full_path.as_deref(), &query))733 }734 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {735 let id = self.0.value.expect("can't use build on not-value");736 let query = format!(":b sess_field_{id}");737 let vid = self738 .0739 .session740 .0741 .lock()742 .await743 .execute_expression_raw(&query, &mut NixHandler::default())744 .await?;745 ensure!(746 !vid.is_empty(),747 "build failed: {}",748 context(self.0.full_path.as_deref(), &query),749 );750 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")751 else {752 panic!("unexpected build output: {vid:?}");753 };754 let outputs = vid755 .split('\n')756 .filter(|v| !v.is_empty())757 .map(|v| v.split_once(" -> ").expect("unexpected build output"))758 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))759 .collect();760 Ok(outputs)761 }762}763impl Drop for FieldInner {764 fn drop(&mut self) {765 if let Some(id) = self.value {766 if let Ok(mut lock) = self.session.0.try_lock() {767 lock.free_list.push(id)768 }769 // Leaked770 }771 }772}773struct NixSessionPoolInner {774 flake: OsString,775 nix_args: Vec<OsString>,776}777778#[derive(Debug)]779pub struct NixPoolError(anyhow::Error);780impl From<anyhow::Error> for NixPoolError {781 fn from(value: anyhow::Error) -> Self {782 Self(value)783 }784}785impl std::error::Error for NixPoolError {}786impl std::fmt::Display for NixPoolError {787 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {788 self.0.fmt(f)789 }790}791impl r2d2::ManageConnection for NixSessionPoolInner {792 type Connection = NixSessionInner;793 type Error = NixPoolError;794 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {795 let _v = TOKIO_RUNTIME796 .get()797 .expect("missed tokio runtime init!")798 .enter();799 Ok(futures::executor::block_on(NixSessionInner::new(800 self.flake.as_os_str(),801 self.nix_args.iter().map(OsString::as_os_str),802 ))?)803 }804805 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {806 let _v = TOKIO_RUNTIME807 .get()808 .expect("missed tokio runtime init!")809 .enter();810 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;811 if res != 4 {812 return Err(anyhow!("sanity check failed").into());813 };814 Ok(())815 }816817 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {818 false819 }820}821pub struct NixSessionPool(Pool<NixSessionPoolInner>);822impl NixSessionPool {823 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {824 let inner = tokio::task::block_in_place(|| {825 r2d2::Builder::<NixSessionPoolInner>::new()826 .min_idle(Some(0))827 .build(NixSessionPoolInner { flake, nix_args })828 })?;829 Ok(Self(inner))830 }831 pub async fn get(&self) -> Result<NixSession> {832 let v = tokio::task::block_in_place(|| self.0.get())?;833 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))834 }835}836837pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -4,8 +4,8 @@
use crate::command::MyCommand;
use crate::host::Config;
-use crate::nix_path;
-use anyhow::{anyhow, Result, Context};
+use crate::nix_go;
+use anyhow::{anyhow, Result};
use clap::Parser;
use itertools::Itertools;
use tokio::{task::LocalSet, time::sleep};
@@ -290,12 +290,10 @@
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
let action = Action::from(self.subcommand.clone());
- let drv = config
- .fleet_field
- .select(nix_path!(.buildSystems((serde_json::json!({
- "localSystem": config.local_system.clone(),
- }))).{action.build_attr()}.{&host}))
- .await.context("system attribute")?;
+ let fleet_field = &config.fleet_field;
+ let drv = nix_go!(fleet_field.buildSystems(Obj {
+ localSystem: { config.local_system.clone() }
+ }));
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
info!("sd-image build failed");
cmds/fleet/src/cmds/info.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/info.rs
+++ b/cmds/fleet/src/cmds/info.rs
@@ -1,7 +1,7 @@
use std::collections::BTreeSet;
use crate::host::Config;
-use crate::nix_path;
+use crate::nix_go_json;
use anyhow::{ensure, Result};
use clap::Parser;
@@ -37,12 +37,9 @@
InfoCmd::ListHosts { ref tagged } => {
'host: for host in config.list_hosts().await? {
if !tagged.is_empty() {
- let tags: Vec<String> = config
- .fleet_field
- .select(nix_path!(.configuredSystems.{&host.name}.config.tags))
- .await?
- .as_json()
- .await?;
+ let fleet_field = &config.fleet_field;
+ let tags: Vec<String> =
+ nix_go_json!(fleet_field.configuredSystems[{ host.name }].config.tags);
for tag in tagged {
if !tags.contains(tag) {
continue 'host;
@@ -64,20 +61,12 @@
let mut out = <BTreeSet<String>>::new();
let host = config.system_config(&host).await?;
if external {
- out.extend(
- host.select(nix_path!(.network.externalIps))
- .await?
- .as_json::<Vec<String>>()
- .await?,
- );
+ let data: Vec<String> = nix_go_json!(host.network.externalIps);
+ out.extend(data);
}
if internal {
- out.extend(
- host.select(nix_path!(.network.internalIps))
- .await?
- .as_json::<Vec<String>>()
- .await?,
- );
+ let data: Vec<String> = nix_go_json!(host.network.internalIps);
+ out.extend(data);
}
for ip in out {
data.push(ip);
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,9 +1,10 @@
use crate::{
fleetdata::{FleetSecret, FleetSharedSecret},
- host::Config, nix_path,
+ host::Config,
+ nix_go, nix_go_json,
};
-use anyhow::{bail, ensure, Context, Result};
-use chrono::Utc;
+use anyhow::{anyhow, bail, ensure, Context, Result};
+use chrono::{DateTime, Utc};
use clap::Parser;
use futures::{StreamExt, TryStreamExt};
use owo_colors::OwoColorize;
@@ -17,8 +18,8 @@
use tracing::{error, info, info_span, warn};
#[derive(Parser)]
-pub enum Secrets {
- /// Force load keys for all defined hosts
+pub enum Secret {
+ /// Force load host keys for all defined hosts
ForceKeys,
/// Add secret, data should be provided in stdin
AddShared {
@@ -29,14 +30,20 @@
/// Override secret if already present
#[clap(long)]
force: bool,
+ /// Secret public part
#[clap(long)]
public: Option<String>,
+ /// Load public part from specified file
#[clap(long)]
public_file: Option<PathBuf>,
+ /// Create a notification on secret expiration
+ #[clap(long)]
+ expires_at: Option<DateTime<Utc>>,
+
/// Secret with this name already exists, override its value while keeping the same owners.
#[clap(long)]
- readd: bool,
+ re_add: bool,
},
/// Add secret, data should be provided in stdin
Add {
@@ -81,12 +88,33 @@
prefer_identities: Vec<String>,
},
List {},
+ InvokeGenerator,
}
-impl Secrets {
+impl Secret {
pub async fn run(self, config: &Config) -> Result<()> {
match self {
- Secrets::ForceKeys => {
+ Secret::InvokeGenerator => {
+ let config_field = &config.config_unchecked_field;
+
+ let generate_impure =
+ nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);
+ let on = nix_go!(generate_impure.on);
+ let call_package = nix_go!(
+ config_field.buildableSystems(Obj {
+ localSystem: { config.local_system.clone() }
+ })[on]
+ .config
+ .nixpkgs
+ .pkgs
+ .callPackage
+ );
+ let generator = nix_go!(call_package(generate_impure.generator));
+ let built = generator.build().await?;
+ // .as_json().await?;
+ dbg!(&built);
+ }
+ Secret::ForceKeys => {
for host in config.list_hosts().await? {
if config.should_skip(&host.name) {
continue;
@@ -94,19 +122,20 @@
config.key(&host.name).await?;
}
}
- Secrets::AddShared {
+ Secret::AddShared {
mut machines,
name,
force,
public,
public_file,
- readd,
+ expires_at,
+ re_add,
} => {
let exists = config.has_shared(&name);
- if exists && !force && !readd {
+ if exists && !force && !re_add {
bail!("secret already defined");
}
- if readd {
+ if re_add {
// Fixme: use clap to limit this usage
ensure!(!force, "--force and --readd are not compatible");
ensure!(exists, "secret doesn't exists");
@@ -137,7 +166,7 @@
.map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)
.collect();
let mut encryptor = age::Encryptor::with_recipients(recipients)
- .expect("recipients provided")
+ .ok_or_else(|| anyhow!("no recipients provided"))?
.wrap_output(&mut encrypted)?;
io::copy(&mut Cursor::new(input), &mut encryptor)?;
encryptor.finish()?;
@@ -150,7 +179,7 @@
owners: machines,
secret: FleetSecret {
created_at: Utc::now(),
- expires_at: None,
+ expires_at,
secret,
public: match (public, public_file) {
(Some(v), None) => Some(v),
@@ -164,7 +193,7 @@
},
);
}
- Secrets::Add {
+ Secret::Add {
machine,
name,
force,
@@ -211,7 +240,7 @@
}
// TODO: Instead of using sudo, decode secret on remote machine
#[allow(clippy::await_holding_refcell_ref)]
- Secrets::Read {
+ Secret::Read {
name,
machine,
plaintext,
@@ -228,7 +257,7 @@
println!("{}", z85::encode(&data));
}
}
- Secrets::UpdateShared {
+ Secret::UpdateShared {
name,
machines,
mut add_machines,
@@ -321,7 +350,7 @@
secret.secret.secret = encrypted;
config.replace_shared(name, secret);
}
- Secrets::Regenerate { prefer_identities } => {
+ Secret::Regenerate { prefer_identities } => {
{
let expected_shared_set = config
.list_configured_shared()
@@ -337,10 +366,9 @@
for name in &config.list_shared() {
info!("updating secret: {name}");
let mut data = config.shared_secret(name)?;
- let expected_owners: Vec<String> = config
- .config_field
- .get_json_deep(nix_path!(sharedSecrets.{name}.expectedOwners))
- .await?;
+ let config_field = &config.config_field;
+ let expected_owners: Vec<String> =
+ nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);
if expected_owners.is_empty() {
warn!("secret was removed from fleet config: {name}, removing from data");
to_remove.push(name.to_string());
@@ -350,10 +378,8 @@
let expected_set = expected_owners.iter().collect::<HashSet<_>>();
let should_remove = set.difference(&expected_set).next().is_some();
if set != expected_set {
- let owner_dependent: bool = config
- .config_field
- .get_json_deep(nix_path!(.sharedSecrets.{name}.ownerDependent))
- .await?;
+ let owner_dependent: bool =
+ nix_go_json!(config_field.sharedSecrets[{ name }].ownerDependent);
if !owner_dependent {
warn!("reencrypting secret '{name}' for new owner set");
// TODO: force regeneration
@@ -401,7 +427,7 @@
config.remove_shared(&k);
}
}
- Secrets::List {} => {
+ Secret::List {} => {
let _span = info_span!("loading secrets").entered();
let configured = config.list_configured_shared().await?;
#[derive(Tabled)]
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -337,6 +337,8 @@
if !text.is_empty()
&& text != "querying info about missing paths"
&& text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
{
let span = info_span!("job");
span.pb_start();
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -16,7 +16,7 @@
better_nix_eval::{Field, NixSessionPool},
command::MyCommand,
fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
- nix_path,
+ nix_go, nix_go_json,
};
pub struct FleetConfigInternals {
@@ -29,6 +29,8 @@
pub fleet_field: Field,
/// fleet_config.configUnchecked
pub config_field: Field,
+ /// fleet_config.unchecked
+ pub config_unchecked_field: Field,
}
#[derive(Clone)]
@@ -95,12 +97,8 @@
}
pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
- let names = self
- .fleet_field
- .select(nix_path!(.configuredHosts))
- .await?
- .list_fields()
- .await?;
+ let fleet_field = &self.fleet_field;
+ let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
let mut out = vec![];
for name in names {
out.push(ConfigHost { name })
@@ -108,9 +106,8 @@
Ok(out)
}
pub async fn system_config(&self, host: &str) -> Result<Field> {
- self.fleet_field
- .select(nix_path!(.configuredSystems.{host}.config))
- .await
+ let fleet_field = &self.fleet_field;
+ Ok(nix_go!(fleet_field.configuredSystems[{ host }].config))
}
pub(super) fn data(&self) -> MutexGuard<FleetData> {
@@ -121,11 +118,8 @@
}
/// Shared secrets configured in fleet.nix or in flake
pub async fn list_configured_shared(&self) -> Result<Vec<String>> {
- self.config_field
- .select(nix_path!(.sharedSecrets))
- .await?
- .list_fields()
- .await
+ let config_field = &self.config_field;
+ nix_go!(config_field.sharedSecrets).list_fields().await
}
/// Shared secrets configured in fleet.nix
pub fn list_shared(&self) -> Vec<String> {
@@ -211,11 +205,10 @@
Ok(secret.clone())
}
pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {
- self.config_field
- .select(nix_path!(.sharedSecrets.{secret}.expectedOwners))
- .await?
- .as_json()
- .await
+ let config_field = &self.config_field;
+ Ok(nix_go_json!(
+ config_field.sharedSecrets[{ secret }].expectedOwners
+ ))
}
pub fn save(&self) -> Result<()> {
@@ -269,21 +262,15 @@
if self.local_system == "detect" {
let builtins_field = Field::field(root_field.clone(), "builtins").await?;
- let system = builtins_field
- .select(nix_path!(.currentSystem))
- .await?;
- self.local_system = system.as_json().await?;
+ self.local_system = nix_go_json!(builtins_field.currentSystem);
}
let local_system = self.local_system.clone();
let fleet_root = Field::field(root_field, "fleetConfigurations").await?;
- let fleet_field = fleet_root
- .select(nix_path!(.default))
- .await?;
- let config_field = fleet_field
- .select(nix_path!(.configUnchecked))
- .await?;
+ let fleet_field = nix_go!(fleet_root.default);
+ let config_field = nix_go!(fleet_field.configUnchecked);
+ let config_unchecked_field = nix_go!(fleet_field.unchecked);
let mut fleet_data_path = directory.clone();
fleet_data_path.push("fleet.nix");
@@ -298,6 +285,7 @@
nix_args,
fleet_field,
config_field,
+ config_unchecked_field,
})))
}
}
cmds/fleet/src/main.rsdiffbeforeafterboth--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -1,5 +1,5 @@
#![recursion_limit = "512"]
-#![feature(try_blocks)]
+#![feature(try_blocks, lint_reasons)]
pub(crate) mod cmds;
pub(crate) mod command;
@@ -17,7 +17,7 @@
use anyhow::{bail, Result};
use clap::Parser;
-use cmds::{build_systems::BuildSystems, info::Info, secrets::Secrets};
+use cmds::{build_systems::BuildSystems, info::Info, secrets::Secret};
use futures::future::LocalBoxFuture;
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
@@ -73,7 +73,7 @@
BuildSystems(BuildSystems),
/// Secret management
#[clap(subcommand)]
- Secrets(Secrets),
+ Secret(Secret),
/// Upload prefetch directory to the nix store
Prefetch(Prefetch),
/// Config parsing
@@ -92,7 +92,7 @@
async fn run_command(config: &Config, command: Opts) -> Result<()> {
match command {
Opts::BuildSystems(c) => c.run(config).await?,
- Opts::Secrets(s) => s.run(config).await?,
+ Opts::Secret(s) => s.run(config).await?,
Opts::Info(i) => i.run(config).await?,
Opts::Prefetch(p) => p.run(config).await?,
};
flake.lockdiffbeforeafterboth--- a/flake.lock
+++ b/flake.lock
@@ -5,11 +5,11 @@
"systems": "systems"
},
"locked": {
- "lastModified": 1694529238,
- "narHash": "sha256-zsNZZGTGnMOf9YpHKJqMSsa0dXbfmxeoJ7xHlrt+xmY=",
+ "lastModified": 1701680307,
+ "narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
"owner": "numtide",
"repo": "flake-utils",
- "rev": "ff7b65b44d01cf9ba6a71320833626af21126384",
+ "rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
"type": "github"
},
"original": {
@@ -38,11 +38,11 @@
},
"nixpkgs": {
"locked": {
- "lastModified": 1698350982,
- "narHash": "sha256-zoEV8Ad3bOAejp0ys/mOpaHSWrzK+GupZwGGYfuWuEY=",
+ "lastModified": 1703705939,
+ "narHash": "sha256-9s2Ep3NyRDj9HUgfv2TQUwQEanRUAmeXkvKIr/o1XbY=",
"owner": "nixos",
"repo": "nixpkgs",
- "rev": "dd83f9de26ff7c0326468b659ea4729fa5cf6262",
+ "rev": "1ada32da4ba24d7310653c9ac54888bee463f455",
"type": "github"
},
"original": {
@@ -67,11 +67,11 @@
]
},
"locked": {
- "lastModified": 1698199907,
- "narHash": "sha256-n8RtHBIb0rLuYs4RDehW6mj6r6Yam/ODY1af/VCcurw=",
+ "lastModified": 1703643208,
+ "narHash": "sha256-UL4KO8JxnD5rOycwHqBAf84lExF1/VnYMDC7b/wpPDU=",
"owner": "oxalica",
"repo": "rust-overlay",
- "rev": "22b8d29fd22cfaa2c311e0d6fd8a0ed9c2a1152b",
+ "rev": "ce117f3e0de8262be8cd324ee6357775228687cf",
"type": "github"
},
"original": {
flake.nixdiffbeforeafterboth--- a/flake.nix
+++ b/flake.nix
@@ -3,35 +3,52 @@
inputs = {
nixpkgs.url = "github:nixos/nixpkgs/master";
- rust-overlay = { url = "github:oxalica/rust-overlay"; inputs.nixpkgs.follows = "nixpkgs"; };
- flake-utils = { url = "github:numtide/flake-utils"; };
+ rust-overlay = {
+ url = "github:oxalica/rust-overlay";
+ inputs.nixpkgs.follows = "nixpkgs";
+ };
+ flake-utils = {url = "github:numtide/flake-utils";};
};
- outputs = { self, rust-overlay, flake-utils, nixpkgs }: with nixpkgs.lib; rec {
- lib = import ./lib { inherit flake-utils; };
- } // flake-utils.lib.eachDefaultSystem (system:
- let
- pkgs = import nixpkgs
- {
- inherit system; overlays = [ (import rust-overlay) ];
- };
- llvmPkgs = pkgs.buildPackages.llvmPackages_11;
- rust = (pkgs.rustChannelOf { date = "2023-10-20"; channel = "nightly"; }).default.override { extensions = [ "rust-src" "rust-analyzer" ]; };
- rustPlatform = pkgs.makeRustPlatform { cargo = rust; rustc = rust; };
- in
- {
- packages = (import ./pkgs) pkgs pkgs;
- devShell = (pkgs.mkShell.override { stdenv = llvmPkgs.stdenv; }) {
- nativeBuildInputs = with pkgs; [
- rust
- lld
- cargo-edit
- cargo-udeps
- cargo-fuzz
+ outputs = {
+ self,
+ rust-overlay,
+ flake-utils,
+ nixpkgs,
+ }:
+ with nixpkgs.lib;
+ {
+ lib = import ./lib {inherit flake-utils;};
+ }
+ // flake-utils.lib.eachDefaultSystem (system: let
+ pkgs =
+ import nixpkgs
+ {
+ inherit system;
+ overlays = [(import rust-overlay)];
+ };
+ llvmPkgs = pkgs.buildPackages.llvmPackages_11;
+ rust =
+ (pkgs.rustChannelOf {
+ date = "2023-12-26";
+ channel = "nightly";
+ })
+ .default
+ .override {extensions = ["rust-src" "rust-analyzer"];};
+ in {
+ packages = (import ./pkgs) pkgs pkgs;
+ devShell = (pkgs.mkShell.override {stdenv = llvmPkgs.stdenv;}) {
+ nativeBuildInputs = with pkgs; [
+ rust
+ lld
+ cargo-edit
+ cargo-udeps
+ cargo-fuzz
+ cargo-watch
- pkg-config
- openssl
- bacon
- ];
- };
- });
+ pkg-config
+ openssl
+ bacon
+ ];
+ };
+ });
}
lib/default.nixdiffbeforeafterboth--- a/lib/default.nix
+++ b/lib/default.nix
@@ -10,80 +10,99 @@
fleetLib = import ./fleetLib.nix {
inherit nixpkgs hostNames;
};
- in
- let
- withData = data: rec {
- root = nixpkgs.lib.evalModules {
- modules = (import ../modules/fleet/_modules.nix) ++ [config data];
- specialArgs = {
- inherit nixpkgs fleetLib;
- };
- };
- failedAssertions = map (x: x.message) (nixpkgs.lib.filter (x: !x.assertion) root.config.assertions);
- rootAssertWarn =
- if failedAssertions != []
- then throw "Failed assertions:\n${nixpkgs.lib.concatStringsSep "\n" (map (x: "- ${x}") failedAssertions)}"
- else nixpkgs.lib.showWarnings root.config.warnings root;
- configuredHosts = rootAssertWarn.config.hosts;
- configuredSecrets = rootAssertWarn.config.secrets;
- configuredSystems = configuredSystemsWithExtraModules [];
- configuredSystemsWithExtraModules = extraModules:
- nixpkgs.lib.listToAttrs (
- map
- (
- name: {
- inherit name;
- value = nixpkgs.lib.nixosSystem {
- system = configuredHosts.${name}.system;
- modules = configuredHosts.${name}.modules ++ extraModules;
- specialArgs = {
- inherit fleetLib;
- fleet = fleetLib.hostsToAttrs (host: configuredSystems.${host}.config);
- };
+ in let
+ root = nixpkgs.lib.evalModules {
+ modules = (import ../modules/fleet/_modules.nix) ++ [config data];
+ specialArgs = {
+ inherit nixpkgs fleetLib;
+ };
+ };
+ failedAssertions = map (x: x.message) (nixpkgs.lib.filter (x: !x.assertion) root.config.assertions);
+ checkedRoot =
+ if failedAssertions != []
+ then throw "Fleet failed assertions:\n${nixpkgs.lib.concatStringsSep "\n" (map (x: "- ${x}") failedAssertions)}"
+ else nixpkgs.lib.showWarnings root.config.warnings root;
+ withData = {
+ root,
+ data,
+ }: rec {
+ configuredHosts = root.config.hosts;
+ configuredUncheckedHosts = root.config.hosts;
+ configuredSystems = configuredSystemsWithExtraModules [];
+ configuredSystemsWithExtraModules = extraModules:
+ nixpkgs.lib.listToAttrs (
+ map
+ (
+ name: {
+ inherit name;
+ value = nixpkgs.lib.nixosSystem {
+ system = configuredHosts.${name}.system;
+ modules = configuredHosts.${name}.modules ++ extraModules;
+ specialArgs = {
+ inherit fleetLib;
+ fleet = fleetLib.hostsToAttrs (host: configuredSystems.${host}.config);
};
- }
- )
- (builtins.attrNames rootAssertWarn.config.hosts)
- );
- buildSystems = {localSystem}: let
- buildConfigurationModule = {config, ...}: {
- # Equivalent to nixpkgs.localSystem
- # nixpkgs.system = localSystem;
- nixpkgs.buildPlatform.system = localSystem;
- };
- in {
- toplevel = builtins.mapAttrs (_name: value: value.config.system.build.toplevel) (configuredSystemsWithExtraModules [
- buildConfigurationModule
- ({...}: {
- buildTarget = "toplevel";
- })
- ]);
- sdImage = builtins.mapAttrs (_name: value: value.config.system.build.sdImage) (configuredSystemsWithExtraModules [
- buildConfigurationModule
- #(nixpkgs + "/nixos/modules/installer/sd-card/sd-image-aarch64-installer.nix")
- ({...}: {
- buildTarget = "sd-image";
- })
- ]);
- installationCd = builtins.mapAttrs (_name: value: value.config.system.build.isoImage) (configuredSystemsWithExtraModules [
- buildConfigurationModule
- (nixpkgs + "/nixos/modules/installer/cd-dvd/installation-cd-minimal.nix")
- ({lib, ...}: {
- buildTarget = "installation-cd";
- # Needed for https://github.com/NixOS/nixpkgs/issues/58959
- boot.supportedFilesystems = lib.mkForce ["btrfs" "reiserfs" "vfat" "f2fs" "xfs" "ntfs" "cifs"];
- })
- ]);
+ };
+ }
+ )
+ (builtins.attrNames root.config.hosts)
+ );
+ buildableSystems = {localSystem}: let
+ buildConfigurationModule = {config, ...}: {
+ # Equivalent to nixpkgs.localSystem
+ # nixpkgs.system = localSystem;
+ nixpkgs.buildPlatform.system = localSystem;
+ };
+ in
+ configuredSystemsWithExtraModules [
+ buildConfigurationModule
+ ];
+ buildSystems = {localSystem}: let
+ buildConfigurationModule = {config, ...}: {
+ # Equivalent to nixpkgs.localSystem
+ # nixpkgs.system = localSystem;
+ nixpkgs.buildPlatform.system = localSystem;
};
- configUnchecked = root.config;
- };
- defaultData = withData data;
- in rec {
- inherit (defaultData) configuredHosts configuredSecrets configuredSystems buildSystems configUnchecked;
- injectData = data: let
- injectedData = withData data;
in {
- inherit (injectedData) configuredHosts configuredSecrets configuredSystems buildSystems configUnchecked;
+ toplevel = builtins.mapAttrs (_name: value: value.config.system.build.toplevel) (configuredSystemsWithExtraModules [
+ buildConfigurationModule
+ ({...}: {
+ buildTarget = "toplevel";
+ })
+ ]);
+ sdImage = builtins.mapAttrs (_name: value: value.config.system.build.sdImage) (configuredSystemsWithExtraModules [
+ buildConfigurationModule
+ #(nixpkgs + "/nixos/modules/installer/sd-card/sd-image-aarch64-installer.nix")
+ ({...}: {
+ buildTarget = "sd-image";
+ })
+ ]);
+ installationCd = builtins.mapAttrs (_name: value: value.config.system.build.isoImage) (configuredSystemsWithExtraModules [
+ buildConfigurationModule
+ (nixpkgs + "/nixos/modules/installer/cd-dvd/installation-cd-minimal.nix")
+ ({lib, ...}: {
+ buildTarget = "installation-cd";
+ # Needed for https://github.com/NixOS/nixpkgs/issues/58959
+ boot.supportedFilesystems = lib.mkForce ["btrfs" "reiserfs" "vfat" "f2fs" "xfs" "ntfs" "cifs"];
+ })
+ ]);
};
+ configUnchecked = root.config;
+ };
+ defaultData = withData {
+ inherit data;
+ root = checkedRoot;
+ };
+ uncheckedData = withData {inherit data root;};
+ in rec {
+ inherit (defaultData) configuredHosts configuredSystems buildSystems configUnchecked buildableSystems;
+ unchecked = {
+ inherit (uncheckedData) configuredHosts configuredSystems buildSystems configUnchecked buildableSystems;
+ };
+ injectData = data: let
+ injectedData = withData data;
+ in {
+ inherit (injectedData) configuredHosts configuredSystems buildSystems configUnchecked;
};
+ };
}
modules/fleet/secrets.nixdiffbeforeafterboth--- a/modules/fleet/secrets.nix
+++ b/modules/fleet/secrets.nix
@@ -15,6 +15,9 @@
type = bool;
description = "Is this secret owner-dependent, and needs to be regenerated on ownership set change, or it may be just reencrypted";
};
+ generateImpure = mkOption {
+ type = unspecified;
+ };
generator = mkOption {
type = nullOr (submodule {
packages = mkOption {