difftreelog
refactor minor rewrites
in: trunk
7 files changed
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::ffi::{OsStr, OsString};3use std::fmt::{self, Display};4use std::path::PathBuf;5use std::process::Stdio;6use std::sync::{Arc, OnceLock};78use anyhow::{anyhow, bail, ensure, Context, Result};9use futures::StreamExt;10use itertools::Itertools;11use r2d2::{Pool, PooledConnection};12use serde::de::DeserializeOwned;13use serde::{Deserialize, Serialize};14use tokio::io::AsyncWriteExt;15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};16use tokio::select;17use tokio::sync::{mpsc, oneshot};18use tokio_util::codec::{FramedRead, LinesCodec};19use tracing::{debug, error, warn, Level};2021use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};2223const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2425pub struct NixSessionInner {26 full_delimiter: String,27 nix_handler: ClonableHandler<NixHandler>,28 out: OutputHandler,29 stdin: ChildStdin,30 string_wrapping: (String, String),31 number_wrapping: (String, String),3233 next_id: u32,34 free_list: Vec<u32>,35}36const TRAIN_STRING: &str = "\"TRAIN_STRING\"";37const TRAIN_NUMBER: &str = "13141516";3839#[must_use]40struct ErrorCollector<'i, H> {41 collected: Vec<String>,42 inner: &'i mut H,43}44impl<'i, H> ErrorCollector<'i, H> {45 fn new(inner: &'i mut H) -> Self {46 Self {47 collected: vec![],48 inner,49 }50 }51}52impl<H> ErrorCollector<'_, H> {53 fn handle_line_inner(&mut self, msg: &str) -> bool {54 let Some(msg) = msg.strip_prefix("@nix ") else {55 return false;56 };57 #[derive(Deserialize)]58 struct ErrorAction {59 action: String,60 level: u32,61 msg: String,62 }63 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {64 return false;65 };66 if act.action != "msg" || act.level != 0 {67 return false;68 }69 self.collected.push(act.msg);70 true71 }72 fn finish(self) -> Result<()> {73 // fn dedent(s: String) -> String {74 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)75 // }76 if !self.collected.is_empty() {77 bail!(78 "{}",79 self.collected80 .iter()81 .map(|v| {82 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {83 let v = unindent::unindent(f.trim_start());84 v.trim().to_owned()85 } else {86 v.to_owned()87 }88 })89 .join("\n")90 );91 }92 Ok(())93 }94 fn flush(self) {95 for line in self.collected {96 warn!("{line}");97 }98 }99}100impl<H: Handler> Handler for ErrorCollector<'_, H> {101 fn handle_line(&mut self, e: &str) {102 if self.handle_line_inner(e) {103 return;104 }105 self.inner.handle_line(e)106 }107}108109enum OutputLine {110 Out(String),111 Err(String),112}113struct OutputHandler {114 rx: mpsc::Receiver<OutputLine>,115 _cancel_handle: oneshot::Receiver<()>,116}117impl OutputHandler {118 fn new(out: ChildStdout, err: ChildStderr) -> Self {119 let mut out = FramedRead::new(out, LinesCodec::new());120 let mut err = FramedRead::new(err, LinesCodec::new());121 let (tx, rx) = mpsc::channel(20);122 let (mut cancelled, _cancel_handle) = oneshot::channel();123 tokio::spawn(async move {124 loop {125 select! {126 // We should receive errors earlier than synchronization127 biased;128 e = err.next() => {129 let Some(Ok(e)) = e else {130 if e.is_some() {131 error!("bad repl stderr: {e:?}");132 }133 continue;134 };135 let _ = tx.send(OutputLine::Err(e)).await;136 }137 o = out.next() => {138 let Some(Ok(o)) = o else {139 if o.is_some() {140 error!("bad repl stdout: {o:?}");141 }142 continue;143 };144 let _ = tx.send(OutputLine::Out(o)).await;145 }146 // Reader doesn't care about stdout, as this is cancelled.147 // Error still might be useful, to process leftover span closures?148 _ = cancelled.closed() => {149 break;150 }151 }152 }153 });154 Self { rx, _cancel_handle }155 }156 async fn next(&mut self) -> Option<OutputLine> {157 self.rx.recv().await158 }159}160161struct WarnHandler;162impl Handler for WarnHandler {163 fn handle_line(&mut self, e: &str) {164 warn!(target: "nix", "{e}")165 }166}167168impl NixSessionInner {169 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {170 let mut cmd = Command::new("nix");171 cmd.arg("repl")172 .arg(flake)173 .arg("--log-format")174 .arg("internal-json");175 for arg in extra_args {176 cmd.arg(arg);177 }178 cmd.stdin(Stdio::piped());179 cmd.stdout(Stdio::piped());180 cmd.stderr(Stdio::piped());181 let cmd = cmd.spawn()?;182 let stdout = cmd.stdout.unwrap();183 let stderr = cmd.stderr.unwrap();184 let mut out = OutputHandler::new(stdout, stderr);185 let mut stdin = cmd.stdin.unwrap();186 // Standard repl hello doesn't work with internal-json logger187 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;188 stdin.write_all(b"\n").await?;189 stdin.flush().await?;190 let nix_handler = NixHandler::default();191 let mut full_delimiter = None;192 let mut errors = vec![];193 while let Some(line) = out.next().await {194 let line = match line {195 OutputLine::Out(o) => o,196 OutputLine::Err(_e) => {197 // Handle startup errors, but skip repl hello?198 errors.push(_e);199 continue;200 }201 };202 if line.contains(REPL_DELIMITER) {203 debug!("discovered repl delimiter with added colors: {line}");204 full_delimiter = Some(line.to_owned());205 break;206 }207 }208 let Some(full_delimiter) = full_delimiter else {209 for e in errors {210 error!("{e}");211 }212 bail!("failed to discover delimiter");213 };214 let mut res = Self {215 full_delimiter,216 nix_handler: ClonableHandler::new(nix_handler),217 out,218 stdin,219 string_wrapping: Default::default(),220 number_wrapping: Default::default(),221222 next_id: 0,223 free_list: vec![],224 };225 res.train().await?;226 Ok(res)227 }228 async fn train(&mut self) -> Result<()> {229 {230 let full_string = self231 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)232 .await?;233 let string_offset = full_string.find(TRAIN_STRING).expect("contained");234 let string_prefix = &full_string[..string_offset];235 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];236 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());237 }238 {239 let full_number = self240 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)241 .await?;242 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");243 let number_prefix = &full_number[..number_offset];244 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];245 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());246 }247 Ok(())248 }249 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {250 if tracing::enabled!(Level::DEBUG) {251 let cmd_str = String::from_utf8_lossy(cmd.as_ref());252 tracing::debug!("{cmd_str}");253 };254 self.stdin.write_all(cmd.as_ref()).await?;255 self.stdin.write_all(b"\n").await?;256 Ok(())257 }258 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {259 let mut out = String::new();260 while let Some(line) = self.out.next().await {261 let line = match line {262 OutputLine::Out(out) => out,263 OutputLine::Err(err) => {264 err_handler.handle_line(&err);265 continue;266 }267 };268 if line == self.full_delimiter {269 return Ok(out);270 }271 if !out.is_empty() {272 out.push('\n');273 }274 out.push_str(&line);275 }276 bail!("didn't reached delimiter");277 }278 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {279 let num = self.number_wrapping.clone();280 let n = self.execute_expression_wrapping(expr, &num).await?;281 Ok(n.parse::<u64>()?)282 }283 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {284 let num = self.string_wrapping.clone();285 let n = self.execute_expression_wrapping(expr, &num).await?;286 let str: String = serde_json::from_str(&n)?;287 Ok(str)288 }289 async fn execute_expression_to_json<V: DeserializeOwned>(290 &mut self,291 expr: impl AsRef<[u8]>,292 ) -> Result<V> {293 let mut fexpr = b"builtins.toJSON (".to_vec();294 fexpr.extend_from_slice(expr.as_ref());295 fexpr.push(b')');296 let v = self.execute_expression_string(fexpr).await?;297 Ok(serde_json::from_str(&v)?)298 }299 async fn execute_expression_wrapping(300 &mut self,301 expr: impl AsRef<[u8]>,302 wrapping: &(String, String),303 ) -> Result<String> {304 let mut nix_handler = self.nix_handler.clone();305 let mut collected = ErrorCollector::new(&mut nix_handler);306 let res = self.execute_expression_raw(expr, &mut collected).await?;307 if res.is_empty() {308 collected.finish()?;309 bail!("expected expression, got nothing")310 } else {311 collected.flush()312 };313 let Some(res) = res.strip_prefix(&wrapping.0) else {314 bail!("invalid type")315 };316 let Some(res) = res.strip_suffix(&wrapping.1) else {317 bail!("invalid type")318 };319 Ok(res.to_owned())320 }321 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {322 let mut nix_handler = self.nix_handler.clone();323 let mut collected = ErrorCollector::new(&mut nix_handler);324 let v = self.execute_expression_raw(expr, &mut collected).await?;325 collected.finish()?;326 ensure!(v.is_empty(), "unexpected expression result");327 Ok(())328 }329 async fn execute_expression_raw(330 &mut self,331 expr: impl AsRef<[u8]>,332 err_handler: &mut dyn Handler,333 ) -> Result<String> {334 self.send_command(expr).await?;335 // It will be echoed336 self.send_command(REPL_DELIMITER).await?;337 self.read_until_delimiter(err_handler).await338 }339 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {340 let id = self.allocate_id();341 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))342 .await?;343 Ok(id)344 }345346 /// Id should be immediately used347 fn allocate_id(&mut self) -> u32 {348 if let Some(free) = self.free_list.pop() {349 free350 } else {351 let v = self.next_id;352 self.next_id += 1;353 v354 }355 }356 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.357 // async fn free_id(&mut self, id: u32) -> Result<()> {358 // self.execute_expression_empty(format!("sess_field_{id} = null"))359 // .await?;360 // self.free_list.push(id);361 // Ok(())362 // }363}364365#[derive(Clone)]366pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);367368#[derive(Clone)]369pub struct NixExprBuilder {370 out: String,371 used_fields: Vec<Field>,372}373impl NixExprBuilder {374 pub fn object() -> Self {375 NixExprBuilder {376 out: "{ ".to_owned(),377 used_fields: Vec::new(),378 }379 }380 pub fn string(s: &str) -> Self {381 NixExprBuilder {382 out: nixlike::serialize(s)383 .expect("no problems with serializing_string")384 .trim_end()385 .to_owned(),386 used_fields: Vec::new(),387 }388 }389 pub fn serialized(v: impl Serialize) -> Self {390 let serialized = nixlike::serialize(v).expect("invalid value for apply");391 Self {392 out: serialized.trim_end().to_owned(),393 used_fields: Vec::new(),394 }395 }396 pub fn field(f: Field) -> Self {397 Self {398 out: format!("sess_field_{}", f.0.value.expect("no value")),399 used_fields: vec![f],400 }401 }402 pub fn end_obj(&mut self) {403 self.out.push('}');404 }405 pub fn obj_key(&mut self, name: Self, value: Self) {406 self.out.push_str(r#""${"#);407 self.extend(name);408 self.out.push_str(r#"}" = "#);409 self.extend(value);410 self.out.push_str("; ");411 }412413 pub fn extend(&mut self, e: Self) {414 self.out.push_str(&e.out);415 self.used_fields.extend(e.used_fields);416 }417418 pub fn session(&self) -> NixSession {419 let mut session = None;420 for ele in &self.used_fields {421 if session.is_none() {422 session = Some(ele.0.session.clone());423 continue;424 }425 let session = &session.as_ref().expect("checked").0;426 let ele_sess = &ele.0.session.0;427 assert!(428 Arc::ptr_eq(session, ele_sess),429 "can't mix fields from different session"430 );431 }432 session.expect("expr without fields used")433 }434 pub fn index_attr(&mut self, s: &str) {435 let escaped = nixlike::serialize(s).expect("string");436 self.out.push('.');437 self.out.push_str(escaped.trim_end());438 }439}440441#[macro_export]442macro_rules! nix_expr_inner {443 (Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{444 use $crate::better_nix_eval::NixExprBuilder;445 let mut out = NixExprBuilder::object();446 $(447 out.obj_key(448 NixExprBuilder::string(stringify!($ident)),449 $crate::nix_expr_inner!($($val)+),450 );451 )*452 out.end_obj();453 out454 }};455 (@field($o:ident) . $var:ident $($tt:tt)*) => {{456 $o.index_attr(stringify!($var));457 nix_expr_inner!(@field($o) $($tt)*);458 }};459 (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{460 $o.push(Index::attr(&$v));461 nix_expr_inner!(@o($o) $($tt)*);462 }};463 (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{464 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));465 nix_expr_inner!(@o($o) $($tt)*);466 }};467 (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {468 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));469 nix_expr_inner!(@o($o) $($tt)*);470 };471 (@field($o:ident)) => {};472 ($field:ident $($tt:tt)*) => {{473 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};474 #[allow(unused_mut, reason = "might be used if indexed")]475 let mut out = NixExprBuilder::field($field.clone());476 nix_expr_inner!(@field(out) $($tt)*);477 out478 }};479 ($v:literal) => {{480 use $crate::better_nix_eval::NixExprBuilder;481 NixExprBuilder::string($v)482 }};483 ({$v:expr}) => {{484 use $crate::better_nix_eval::NixExprBuilder;485 NixExprBuilder::serialized(&$v)486 }}487}488#[macro_export]489macro_rules! nix_expr {490 ($($tt:tt)+) => {{491 use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};492 let expr = nix_expr_inner!($($tt)+);493 Field::new(expr.session(), expr.out)494 }};495}496497#[macro_export]498macro_rules! nix_go {499 (@o($o:ident) . $var:ident $($tt:tt)*) => {{500 $o.push(Index::attr(stringify!($var)));501 nix_go!(@o($o) $($tt)*);502 }};503 (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{504 $o.push(Index::attr(&$v));505 nix_go!(@o($o) $($tt)*);506 }};507 (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{508 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));509 nix_go!(@o($o) $($tt)*);510 }};511 (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {512 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));513 nix_go!(@o($o) $($tt)*);514 };515 (@o($o:ident)) => {};516 ($field:ident $($tt:tt)+) => {{517 use $crate::{nix_go, better_nix_eval::Index};518 let field = $field.clone();519 let mut out = vec![];520 nix_go!(@o(out) $($tt)*);521 field.select(out).await?522 }}523}524#[macro_export]525macro_rules! nix_go_json {526 ($($tt:tt)*) => {{527 $crate::nix_go!($($tt)*).as_json().await?528 }};529}530531#[derive(Clone)]532pub enum Index {533 Var(String),534 String(String),535 Apply(String),536 Expr(NixExprBuilder),537 ExprApply(NixExprBuilder),538}539impl Index {540 pub fn var(v: impl AsRef<str>) -> Self {541 let v = v.as_ref();542 assert!(543 !(v.contains('.') | v.contains(' ')),544 "bad variable name: {v}"545 );546 Self::Var(v.to_owned())547 }548 pub fn attr(v: impl AsRef<str>) -> Self {549 Self::String(v.as_ref().to_owned())550 }551 pub fn apply(v: impl Serialize) -> Self {552 let serialized = nixlike::serialize(v).expect("invalid value for apply");553 Self::Apply(serialized.trim_end().to_owned())554 }555}556impl Display for Index {557 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {558 match self {559 Index::Var(v) => {560 write!(f, "{v}")561 }562 Index::String(k) => {563 let v = nixlike::format_identifier(k.as_str());564 write!(f, ".{v}")565 }566 Index::Apply(o) => {567 write!(f, "<apply>({o})")568 }569 Index::Expr(e) => {570 write!(f, "[{}]", e.out)571 }572 Index::ExprApply(e) => {573 write!(f, "<apply>({})", e.out)574 }575 }576 }577}578impl fmt::Debug for Index {579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {580 write!(f, "{self}")581 }582}583struct PathDisplay<'i>(&'i [Index]);584impl Display for PathDisplay<'_> {585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {586 for i in self.0 {587 write!(f, "{i}")?;588 }589 Ok(())590 }591}592struct FieldInner {593 full_path: Option<Vec<Index>>,594 session: NixSession,595 value: Option<u32>,596}597fn context(full_path: Option<&[Index]>, query: &str) -> String {598 if let Some(full_path) = &full_path {599 format!("full path: {}", PathDisplay(full_path))600 } else {601 format!("query: {query:?}")602 }603}604#[derive(Clone)]605pub struct Field(Arc<FieldInner>);606impl Field {607 fn root(session: NixSession) -> Self {608 Self(Arc::new(FieldInner {609 full_path: Some(vec![]),610 session,611 value: None,612 }))613 }614 async fn new(session: NixSession, query: &str) -> Result<Self> {615 let vid = session616 .0617 .lock()618 .await619 .execute_assign(query)620 .await621 .with_context(|| context(None, query))?;622 Ok(Self(Arc::new(FieldInner {623 full_path: None,624 session,625 value: Some(vid),626 })))627 }628 pub async fn field(session: NixSession, field: &str) -> Result<Self> {629 Self::root(session).select([Index::var(field)]).await630 }631 pub async fn get_json_deep<'a, V: DeserializeOwned>(632 &self,633 name: impl IntoIterator<Item = Index>,634 ) -> Result<V> {635 let field = self.select(name).await?;636 field.as_json().await637 }638 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {639 let mut used_fields = Vec::new();640 let mut name = name.into_iter();641642 let mut full_path = self.0.full_path.clone();643 let mut query = if let Some(id) = self.0.value {644 format!("sess_field_{id}")645 } else {646 let first = name.next();647 if let Some(Index::Var(i)) = first {648 if let Some(full_path) = &mut full_path {649 full_path.push(Index::Var(i.clone()));650 }651 i.clone()652 } else {653 panic!("first path item should be variable, got {first:?}")654 }655 };656 for v in name {657 if let Some(full_path) = &mut full_path {658 full_path.push(v.clone());659 }660 match v {661 Index::Var(_) => panic!("var item may only be first"),662 Index::String(s) => {663 let escaped = nixlike::serialize(s)?;664 query.push('.');665 query.push_str(escaped.trim());666 }667 Index::Apply(a) => {668 // In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`669 query = format!("({query} {a})");670 }671 Index::Expr(e) => {672 let index = Field::new(self.0.session.clone(), &e.out).await?;673 used_fields.push(index.clone());674 query.push('.');675 let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));676 query.push_str(&index);677 }678 Index::ExprApply(e) => {679 let index = Field::new(self.0.session.clone(), &e.out).await?;680 used_fields.push(index.clone());681 query.push(' ');682 let index = format!("sess_field_{}", index.0.value.expect("value"));683 query.push_str(&index);684 query = format!("({query})");685 }686 }687 }688689 let vid = self690 .0691 .session692 .0693 .lock()694 .await695 .execute_assign(&query)696 .await697 .with_context(|| {698 if let Some(full_path) = &full_path {699 format!("full path: {}", PathDisplay(full_path))700 } else {701 format!("query: {query:?}")702 }703 })?;704 Ok(Self(Arc::new(FieldInner {705 full_path,706 session: self.0.session.clone(),707 value: Some(vid),708 })))709 }710 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {711 let id = self.0.value.expect("can't serialize root field");712 let query = format!("sess_field_{id}");713 self.0714 .session715 .0716 .lock()717 .await718 .execute_expression_to_json(&query)719 .await720 .with_context(|| context(self.0.full_path.as_deref(), &query))721 }722 pub async fn list_fields(&self) -> Result<Vec<String>> {723 let id = self.0.value.expect("can't list root fields");724 let query = format!("builtins.attrNames sess_field_{id}");725 self.0726 .session727 .0728 .lock()729 .await730 .execute_expression_to_json(&query)731 .await732 .with_context(|| context(self.0.full_path.as_deref(), &query))733 }734 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {735 let id = self.0.value.expect("can't use build on not-value");736 let query = format!(":b sess_field_{id}");737 let vid = self738 .0739 .session740 .0741 .lock()742 .await743 .execute_expression_raw(&query, &mut NixHandler::default())744 .await?;745 ensure!(746 !vid.is_empty(),747 "build failed: {}",748 context(self.0.full_path.as_deref(), &query),749 );750 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")751 else {752 panic!("unexpected build output: {vid:?}");753 };754 let outputs = vid755 .split('\n')756 .filter(|v| !v.is_empty())757 .map(|v| v.split_once(" -> ").expect("unexpected build output"))758 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))759 .collect();760 Ok(outputs)761 }762}763impl Drop for FieldInner {764 fn drop(&mut self) {765 if let Some(id) = self.value {766 if let Ok(mut lock) = self.session.0.try_lock() {767 lock.free_list.push(id)768 }769 // Leaked770 }771 }772}773struct NixSessionPoolInner {774 flake: OsString,775 nix_args: Vec<OsString>,776}777778#[derive(Debug)]779pub struct NixPoolError(anyhow::Error);780impl From<anyhow::Error> for NixPoolError {781 fn from(value: anyhow::Error) -> Self {782 Self(value)783 }784}785impl std::error::Error for NixPoolError {}786impl std::fmt::Display for NixPoolError {787 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {788 self.0.fmt(f)789 }790}791impl r2d2::ManageConnection for NixSessionPoolInner {792 type Connection = NixSessionInner;793 type Error = NixPoolError;794 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {795 let _v = TOKIO_RUNTIME796 .get()797 .expect("missed tokio runtime init!")798 .enter();799 Ok(futures::executor::block_on(NixSessionInner::new(800 self.flake.as_os_str(),801 self.nix_args.iter().map(OsString::as_os_str),802 ))?)803 }804805 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {806 let _v = TOKIO_RUNTIME807 .get()808 .expect("missed tokio runtime init!")809 .enter();810 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;811 if res != 4 {812 return Err(anyhow!("sanity check failed").into());813 };814 Ok(())815 }816817 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {818 false819 }820}821pub struct NixSessionPool(Pool<NixSessionPoolInner>);822impl NixSessionPool {823 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {824 let inner = tokio::task::block_in_place(|| {825 r2d2::Builder::<NixSessionPoolInner>::new()826 .min_idle(Some(0))827 .build(NixSessionPoolInner { flake, nix_args })828 })?;829 Ok(Self(inner))830 }831 pub async fn get(&self) -> Result<NixSession> {832 let v = tokio::task::block_in_place(|| self.0.get())?;833 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))834 }835}836837pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();1use std::collections::HashMap;2use std::ffi::{OsStr, OsString};3use std::fmt::{self, Display};4use std::path::PathBuf;5use std::process::Stdio;6use std::sync::{Arc, OnceLock};78use anyhow::{anyhow, bail, ensure, Context, Result};9use futures::StreamExt;10use itertools::Itertools;11use r2d2::{Pool, PooledConnection};12use serde::de::DeserializeOwned;13use serde::{Deserialize, Serialize};14use tokio::io::AsyncWriteExt;15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};16use tokio::select;17use tokio::sync::{mpsc, oneshot};18use tokio_util::codec::{FramedRead, LinesCodec};19use tracing::{debug, error, warn, Level};2021use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};2223const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2425pub struct NixSessionInner {26 full_delimiter: String,27 nix_handler: ClonableHandler<NixHandler>,28 out: OutputHandler,29 stdin: ChildStdin,30 string_wrapping: (String, String),31 number_wrapping: (String, String),3233 next_id: u32,34 free_list: Vec<u32>,35}36const TRAIN_STRING: &str = "\"TRAIN_STRING\"";37const TRAIN_NUMBER: &str = "13141516";3839#[must_use]40struct ErrorCollector<'i, H> {41 collected: Vec<String>,42 inner: &'i mut H,43}44impl<'i, H> ErrorCollector<'i, H> {45 fn new(inner: &'i mut H) -> Self {46 Self {47 collected: vec![],48 inner,49 }50 }51}52impl<H> ErrorCollector<'_, H> {53 fn handle_line_inner(&mut self, msg: &str) -> bool {54 let Some(msg) = msg.strip_prefix("@nix ") else {55 return false;56 };57 #[derive(Deserialize)]58 struct ErrorAction {59 action: String,60 level: u32,61 msg: String,62 }63 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {64 return false;65 };66 if act.action != "msg" || act.level != 0 {67 return false;68 }69 self.collected.push(act.msg);70 true71 }72 fn finish(self) -> Result<()> {73 // fn dedent(s: String) -> String {74 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)75 // }76 if !self.collected.is_empty() {77 bail!(78 "{}",79 self.collected80 .iter()81 .map(|v| {82 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {83 let v = unindent::unindent(f.trim_start());84 v.trim().to_owned()85 } else {86 v.to_owned()87 }88 })89 .join("\n")90 );91 }92 Ok(())93 }94 fn flush(self) {95 for line in self.collected {96 warn!("{line}");97 }98 }99}100impl<H: Handler> Handler for ErrorCollector<'_, H> {101 fn handle_line(&mut self, e: &str) {102 if self.handle_line_inner(e) {103 return;104 }105 self.inner.handle_line(e)106 }107}108109enum OutputLine {110 Out(String),111 Err(String),112}113struct OutputHandler {114 rx: mpsc::Receiver<OutputLine>,115 _cancel_handle: oneshot::Receiver<()>,116}117impl OutputHandler {118 fn new(out: ChildStdout, err: ChildStderr) -> Self {119 let mut out = FramedRead::new(out, LinesCodec::new());120 let mut err = FramedRead::new(err, LinesCodec::new());121 let (tx, rx) = mpsc::channel(20);122 let (mut cancelled, _cancel_handle) = oneshot::channel();123 tokio::spawn(async move {124 loop {125 select! {126 // We should receive errors earlier than synchronization127 biased;128 e = err.next() => {129 let Some(Ok(e)) = e else {130 if e.is_some() {131 error!("bad repl stderr: {e:?}");132 }133 continue;134 };135 let _ = tx.send(OutputLine::Err(e)).await;136 }137 o = out.next() => {138 let Some(Ok(o)) = o else {139 if o.is_some() {140 error!("bad repl stdout: {o:?}");141 }142 continue;143 };144 let _ = tx.send(OutputLine::Out(o)).await;145 }146 // Reader doesn't care about stdout, as this is cancelled.147 // Error still might be useful, to process leftover span closures?148 _ = cancelled.closed() => {149 break;150 }151 }152 }153 });154 Self { rx, _cancel_handle }155 }156 async fn next(&mut self) -> Option<OutputLine> {157 self.rx.recv().await158 }159}160161struct WarnHandler;162impl Handler for WarnHandler {163 fn handle_line(&mut self, e: &str) {164 warn!(target: "nix", "{e}")165 }166}167168impl NixSessionInner {169 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {170 let mut cmd = Command::new("nix");171 cmd.arg("repl")172 .arg(flake)173 .arg("--log-format")174 .arg("internal-json");175 for arg in extra_args {176 cmd.arg(arg);177 }178 cmd.stdin(Stdio::piped());179 cmd.stdout(Stdio::piped());180 cmd.stderr(Stdio::piped());181 let cmd = cmd.spawn()?;182 let stdout = cmd.stdout.unwrap();183 let stderr = cmd.stderr.unwrap();184 let mut out = OutputHandler::new(stdout, stderr);185 let mut stdin = cmd.stdin.unwrap();186 // Standard repl hello doesn't work with internal-json logger187 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;188 stdin.write_all(b"\n").await?;189 stdin.flush().await?;190 let nix_handler = NixHandler::default();191 let mut full_delimiter = None;192 let mut errors = vec![];193 while let Some(line) = out.next().await {194 let line = match line {195 OutputLine::Out(o) => o,196 OutputLine::Err(_e) => {197 // Handle startup errors, but skip repl hello?198 errors.push(_e);199 continue;200 }201 };202 if line.contains(REPL_DELIMITER) {203 debug!("discovered repl delimiter with added colors: {line}");204 full_delimiter = Some(line.to_owned());205 break;206 }207 }208 let Some(full_delimiter) = full_delimiter else {209 for e in errors {210 error!("{e}");211 }212 bail!("failed to discover delimiter");213 };214 let mut res = Self {215 full_delimiter,216 nix_handler: ClonableHandler::new(nix_handler),217 out,218 stdin,219 string_wrapping: Default::default(),220 number_wrapping: Default::default(),221222 next_id: 0,223 free_list: vec![],224 };225 res.train().await?;226 Ok(res)227 }228 async fn train(&mut self) -> Result<()> {229 {230 let full_string = self231 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)232 .await?;233 let string_offset = full_string.find(TRAIN_STRING).expect("contained");234 let string_prefix = &full_string[..string_offset];235 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];236 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());237 }238 {239 let full_number = self240 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)241 .await?;242 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");243 let number_prefix = &full_number[..number_offset];244 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];245 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());246 }247 Ok(())248 }249 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {250 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {251 let cmd_str = String::from_utf8_lossy(cmd.as_ref());252 tracing::debug!("{cmd_str}");253 };254 self.stdin.write_all(cmd.as_ref()).await?;255 self.stdin.write_all(b"\n").await?;256 Ok(())257 }258 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {259 let mut out = String::new();260 while let Some(line) = self.out.next().await {261 let line = match line {262 OutputLine::Out(out) => out,263 OutputLine::Err(err) => {264 err_handler.handle_line(&err);265 continue;266 }267 };268 if line == self.full_delimiter {269 return Ok(out);270 }271 if !out.is_empty() {272 out.push('\n');273 }274 out.push_str(&line);275 }276 bail!("didn't reached delimiter");277 }278 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {279 let num = self.number_wrapping.clone();280 let n = self.execute_expression_wrapping(expr, &num).await?;281 Ok(n.parse::<u64>()?)282 }283 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {284 let num = self.string_wrapping.clone();285 let n = self.execute_expression_wrapping(expr, &num).await?;286 let str: String = serde_json::from_str(&n)?;287 Ok(str)288 }289 async fn execute_expression_to_json<V: DeserializeOwned>(290 &mut self,291 expr: impl AsRef<[u8]>,292 ) -> Result<V> {293 let mut fexpr = b"builtins.toJSON (".to_vec();294 fexpr.extend_from_slice(expr.as_ref());295 fexpr.push(b')');296 let v = self.execute_expression_string(fexpr).await?;297 Ok(serde_json::from_str(&v)?)298 }299 async fn execute_expression_wrapping(300 &mut self,301 expr: impl AsRef<[u8]>,302 wrapping: &(String, String),303 ) -> Result<String> {304 let mut nix_handler = self.nix_handler.clone();305 let mut collected = ErrorCollector::new(&mut nix_handler);306 let res = self.execute_expression_raw(expr, &mut collected).await?;307 if res.is_empty() {308 collected.finish()?;309 bail!("expected expression, got nothing")310 } else {311 collected.flush()312 };313 let Some(res) = res.strip_prefix(&wrapping.0) else {314 bail!("invalid type")315 };316 let Some(res) = res.strip_suffix(&wrapping.1) else {317 bail!("invalid type")318 };319 Ok(res.to_owned())320 }321 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {322 let mut nix_handler = self.nix_handler.clone();323 let mut collected = ErrorCollector::new(&mut nix_handler);324 let v = self.execute_expression_raw(expr, &mut collected).await?;325 collected.finish()?;326 ensure!(v.is_empty(), "unexpected expression result");327 Ok(())328 }329 async fn execute_expression_raw(330 &mut self,331 expr: impl AsRef<[u8]>,332 err_handler: &mut dyn Handler,333 ) -> Result<String> {334 self.send_command(expr).await?;335 // It will be echoed336 self.send_command(REPL_DELIMITER).await?;337 self.read_until_delimiter(err_handler).await338 }339 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {340 let id = self.allocate_id();341 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))342 .await?;343 Ok(id)344 }345346 /// Id should be immediately used347 fn allocate_id(&mut self) -> u32 {348 if let Some(free) = self.free_list.pop() {349 free350 } else {351 let v = self.next_id;352 self.next_id += 1;353 v354 }355 }356 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.357 // async fn free_id(&mut self, id: u32) -> Result<()> {358 // self.execute_expression_empty(format!("sess_field_{id} = null"))359 // .await?;360 // self.free_list.push(id);361 // Ok(())362 // }363}364365#[derive(Clone)]366pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);367368#[derive(Clone)]369pub struct NixExprBuilder {370 out: String,371 used_fields: Vec<Field>,372}373impl NixExprBuilder {374 pub fn object() -> Self {375 NixExprBuilder {376 out: "{ ".to_owned(),377 used_fields: Vec::new(),378 }379 }380 pub fn string(s: &str) -> Self {381 NixExprBuilder {382 out: nixlike::serialize(s)383 .expect("no problems with serializing_string")384 .trim_end()385 .to_owned(),386 used_fields: Vec::new(),387 }388 }389 pub fn serialized(v: impl Serialize) -> Self {390 let serialized = nixlike::serialize(v).expect("invalid value for apply");391 Self {392 out: serialized.trim_end().to_owned(),393 used_fields: Vec::new(),394 }395 }396 pub fn field(f: Field) -> Self {397 Self {398 out: format!("sess_field_{}", f.0.value.expect("no value")),399 used_fields: vec![f],400 }401 }402 pub fn end_obj(&mut self) {403 self.out.push('}');404 }405 pub fn obj_key(&mut self, name: Self, value: Self) {406 self.out.push_str(r#""${"#);407 self.extend(name);408 self.out.push_str(r#"}" = "#);409 self.extend(value);410 self.out.push_str("; ");411 }412413 pub fn extend(&mut self, e: Self) {414 self.out.push_str(&e.out);415 self.used_fields.extend(e.used_fields);416 }417418 pub fn session(&self) -> NixSession {419 let mut session = None;420 for ele in &self.used_fields {421 if session.is_none() {422 session = Some(ele.0.session.clone());423 continue;424 }425 let session = &session.as_ref().expect("checked").0;426 let ele_sess = &ele.0.session.0;427 assert!(428 Arc::ptr_eq(session, ele_sess),429 "can't mix fields from different session"430 );431 }432 session.expect("expr without fields used")433 }434 pub fn index_attr(&mut self, s: &str) {435 let escaped = nixlike::serialize(s).expect("string");436 self.out.push('.');437 self.out.push_str(escaped.trim_end());438 }439}440441#[macro_export]442macro_rules! nix_expr_inner {443 (Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{444 use $crate::better_nix_eval::NixExprBuilder;445 let mut out = NixExprBuilder::object();446 $(447 out.obj_key(448 NixExprBuilder::string(stringify!($ident)),449 $crate::nix_expr_inner!($($val)+),450 );451 )*452 out.end_obj();453 out454 }};455 (@field($o:ident) . $var:ident $($tt:tt)*) => {{456 $o.index_attr(stringify!($var));457 nix_expr_inner!(@field($o) $($tt)*);458 }};459 (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{460 $o.push(Index::attr(&$v));461 nix_expr_inner!(@o($o) $($tt)*);462 }};463 (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{464 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));465 nix_expr_inner!(@o($o) $($tt)*);466 }};467 (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {468 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));469 nix_expr_inner!(@o($o) $($tt)*);470 };471 (@field($o:ident)) => {};472 ($field:ident $($tt:tt)*) => {{473 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};474 #[allow(unused_mut, reason = "might be used if indexed")]475 let mut out = NixExprBuilder::field($field.clone());476 nix_expr_inner!(@field(out) $($tt)*);477 out478 }};479 ($v:literal) => {{480 use $crate::better_nix_eval::NixExprBuilder;481 NixExprBuilder::string($v)482 }};483 ({$v:expr}) => {{484 use $crate::better_nix_eval::NixExprBuilder;485 NixExprBuilder::serialized(&$v)486 }}487}488#[macro_export]489macro_rules! nix_expr {490 ($($tt:tt)+) => {{491 use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};492 let expr = nix_expr_inner!($($tt)+);493 Field::new(expr.session(), expr.out)494 }};495}496497#[macro_export]498macro_rules! nix_go {499 (@o($o:ident) . $var:ident $($tt:tt)*) => {{500 $o.push(Index::attr(stringify!($var)));501 nix_go!(@o($o) $($tt)*);502 }};503 (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{504 $o.push(Index::attr(&$v));505 nix_go!(@o($o) $($tt)*);506 }};507 (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{508 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));509 nix_go!(@o($o) $($tt)*);510 }};511 (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {512 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));513 nix_go!(@o($o) $($tt)*);514 };515 (@o($o:ident)) => {};516 ($field:ident $($tt:tt)+) => {{517 use $crate::{nix_go, better_nix_eval::Index};518 let field = $field.clone();519 let mut out = vec![];520 nix_go!(@o(out) $($tt)*);521 field.select(out).await?522 }}523}524#[macro_export]525macro_rules! nix_go_json {526 ($($tt:tt)*) => {{527 $crate::nix_go!($($tt)*).as_json().await?528 }};529}530531#[derive(Clone)]532pub enum Index {533 Var(String),534 String(String),535 Apply(String),536 Expr(NixExprBuilder),537 ExprApply(NixExprBuilder),538}539impl Index {540 pub fn var(v: impl AsRef<str>) -> Self {541 let v = v.as_ref();542 assert!(543 !(v.contains('.') | v.contains(' ')),544 "bad variable name: {v}"545 );546 Self::Var(v.to_owned())547 }548 pub fn attr(v: impl AsRef<str>) -> Self {549 Self::String(v.as_ref().to_owned())550 }551 pub fn apply(v: impl Serialize) -> Self {552 let serialized = nixlike::serialize(v).expect("invalid value for apply");553 Self::Apply(serialized.trim_end().to_owned())554 }555}556impl Display for Index {557 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {558 match self {559 Index::Var(v) => {560 write!(f, "{v}")561 }562 Index::String(k) => {563 let v = nixlike::format_identifier(k.as_str());564 write!(f, ".{v}")565 }566 Index::Apply(o) => {567 write!(f, "<apply>({o})")568 }569 Index::Expr(e) => {570 write!(f, "[{}]", e.out)571 }572 Index::ExprApply(e) => {573 write!(f, "<apply>({})", e.out)574 }575 }576 }577}578impl fmt::Debug for Index {579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {580 write!(f, "{self}")581 }582}583struct PathDisplay<'i>(&'i [Index]);584impl Display for PathDisplay<'_> {585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {586 for i in self.0 {587 write!(f, "{i}")?;588 }589 Ok(())590 }591}592struct FieldInner {593 full_path: Option<Vec<Index>>,594 session: NixSession,595 value: Option<u32>,596}597fn context(full_path: Option<&[Index]>, query: &str) -> String {598 if let Some(full_path) = &full_path {599 format!("full path: {}", PathDisplay(full_path))600 } else {601 format!("query: {query:?}")602 }603}604#[derive(Clone)]605pub struct Field(Arc<FieldInner>);606impl Field {607 fn root(session: NixSession) -> Self {608 Self(Arc::new(FieldInner {609 full_path: Some(vec![]),610 session,611 value: None,612 }))613 }614 async fn new(session: NixSession, query: &str) -> Result<Self> {615 let vid = session616 .0617 .lock()618 .await619 .execute_assign(query)620 .await621 .with_context(|| context(None, query))?;622 Ok(Self(Arc::new(FieldInner {623 full_path: None,624 session,625 value: Some(vid),626 })))627 }628 pub async fn field(session: NixSession, field: &str) -> Result<Self> {629 Self::root(session).select([Index::var(field)]).await630 }631 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {632 let mut used_fields = Vec::new();633 let mut name = name.into_iter();634635 let mut full_path = self.0.full_path.clone();636 let mut query = if let Some(id) = self.0.value {637 format!("sess_field_{id}")638 } else {639 let first = name.next();640 if let Some(Index::Var(i)) = first {641 if let Some(full_path) = &mut full_path {642 full_path.push(Index::Var(i.clone()));643 }644 i.clone()645 } else {646 panic!("first path item should be variable, got {first:?}")647 }648 };649 for v in name {650 if let Some(full_path) = &mut full_path {651 full_path.push(v.clone());652 }653 match v {654 Index::Var(_) => panic!("var item may only be first"),655 Index::String(s) => {656 let escaped = nixlike::serialize(s)?;657 query.push('.');658 query.push_str(escaped.trim());659 }660 Index::Apply(a) => {661 // In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`662 query = format!("({query} {a})");663 }664 Index::Expr(e) => {665 let index = Field::new(self.0.session.clone(), &e.out).await?;666 used_fields.push(index.clone());667 query.push('.');668 let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));669 query.push_str(&index);670 }671 Index::ExprApply(e) => {672 let index = Field::new(self.0.session.clone(), &e.out).await?;673 used_fields.push(index.clone());674 query.push(' ');675 let index = format!("sess_field_{}", index.0.value.expect("value"));676 query.push_str(&index);677 query = format!("({query})");678 }679 }680 }681682 let vid = self683 .0684 .session685 .0686 .lock()687 .await688 .execute_assign(&query)689 .await690 .with_context(|| {691 if let Some(full_path) = &full_path {692 format!("full path: {}", PathDisplay(full_path))693 } else {694 format!("query: {query:?}")695 }696 })?;697 Ok(Self(Arc::new(FieldInner {698 full_path,699 session: self.0.session.clone(),700 value: Some(vid),701 })))702 }703 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {704 let id = self.0.value.expect("can't serialize root field");705 let query = format!("sess_field_{id}");706 self.0707 .session708 .0709 .lock()710 .await711 .execute_expression_to_json(&query)712 .await713 .with_context(|| context(self.0.full_path.as_deref(), &query))714 }715 pub async fn has_field(&self, name: &str) -> Result<bool> {716 let id = self.0.value.expect("can't list root fields");717 let key = nixlike::escape_string(name);718 let query = format!("sess_field_{id} ? {key}");719 self.0720 .session721 .0722 .lock()723 .await724 .execute_expression_to_json(&query)725 .await726 .with_context(|| context(self.0.full_path.as_deref(), &query))727 }728 pub async fn list_fields(&self) -> Result<Vec<String>> {729 let id = self.0.value.expect("can't list root fields");730 let query = format!("builtins.attrNames sess_field_{id}");731 self.0732 .session733 .0734 .lock()735 .await736 .execute_expression_to_json(&query)737 .await738 .with_context(|| context(self.0.full_path.as_deref(), &query))739 }740 pub async fn type_of(&self) -> Result<String> {741 let id = self.0.value.expect("can't list root fields");742 let query = format!("builtins.typeOf sess_field_{id}");743 self.0744 .session745 .0746 .lock()747 .await748 .execute_expression_to_json(&query)749 .await750 .with_context(|| context(self.0.full_path.as_deref(), &query))751 }752 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {753 let id = self.0.value.expect("can't use build on not-value");754 let query = format!(":b sess_field_{id}");755 let vid = self756 .0757 .session758 .0759 .lock()760 .await761 .execute_expression_raw(&query, &mut NixHandler::default())762 .await?;763 ensure!(764 !vid.is_empty(),765 "build failed: {}",766 context(self.0.full_path.as_deref(), &query),767 );768 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")769 else {770 panic!("unexpected build output: {vid:?}");771 };772 let outputs = vid773 .split('\n')774 .filter(|v| !v.is_empty())775 .map(|v| v.split_once(" -> ").expect("unexpected build output"))776 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))777 .collect();778 Ok(outputs)779 }780}781impl Drop for FieldInner {782 fn drop(&mut self) {783 if let Some(id) = self.value {784 if let Ok(mut lock) = self.session.0.try_lock() {785 lock.free_list.push(id)786 }787 // Leaked788 }789 }790}791struct NixSessionPoolInner {792 flake: OsString,793 nix_args: Vec<OsString>,794}795796#[derive(Debug)]797pub struct NixPoolError(anyhow::Error);798impl From<anyhow::Error> for NixPoolError {799 fn from(value: anyhow::Error) -> Self {800 Self(value)801 }802}803impl std::error::Error for NixPoolError {}804impl std::fmt::Display for NixPoolError {805 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {806 self.0.fmt(f)807 }808}809impl r2d2::ManageConnection for NixSessionPoolInner {810 type Connection = NixSessionInner;811 type Error = NixPoolError;812 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {813 let _v = TOKIO_RUNTIME814 .get()815 .expect("missed tokio runtime init!")816 .enter();817 Ok(futures::executor::block_on(NixSessionInner::new(818 self.flake.as_os_str(),819 self.nix_args.iter().map(OsString::as_os_str),820 ))?)821 }822823 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {824 let _v = TOKIO_RUNTIME825 .get()826 .expect("missed tokio runtime init!")827 .enter();828 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;829 if res != 4 {830 return Err(anyhow!("sanity check failed").into());831 };832 Ok(())833 }834835 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {836 false837 }838}839pub struct NixSessionPool(Pool<NixSessionPoolInner>);840impl NixSessionPool {841 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {842 let inner = tokio::task::block_in_place(|| {843 r2d2::Builder::<NixSessionPoolInner>::new()844 .min_idle(Some(0))845 .build(NixSessionPoolInner { flake, nix_args })846 })?;847 Ok(Self(inner))848 }849 pub async fn get(&self) -> Result<NixSession> {850 let v = tokio::task::block_in_place(|| self.0.get())?;851 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))852 }853}854855pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,6 +1,6 @@
use crate::{
- command::MyCommand,
- fleetdata::{FleetSecret, FleetSharedSecret},
+ better_nix_eval::Field,
+ fleetdata::{FleetSecret, FleetSharedSecret, SecretData},
host::Config,
nix_go, nix_go_json,
};
@@ -8,16 +8,16 @@
use chrono::{DateTime, Utc};
use clap::Parser;
use futures::{StreamExt, TryStreamExt};
+use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
collections::HashSet,
io::{self, Cursor, Read},
path::PathBuf,
- sync::Arc,
};
use tabled::{Table, Tabled};
use tokio::fs::read_to_string;
-use tracing::{error, info, info_span, warn};
+use tracing::{info, info_span, warn};
#[derive(Parser)]
pub enum Secret {
@@ -90,82 +90,153 @@
prefer_identities: Vec<String>,
},
List {},
- InvokeGenerator,
}
-impl Secret {
- pub async fn run(self, config: &Config) -> Result<()> {
- match self {
- Secret::InvokeGenerator => {
- let config_field = &config.config_unchecked_field;
+async fn generate_shared(
+ config: &Config,
+ display_name: &str,
+ secret: Field,
+) -> Result<FleetSharedSecret> {
+ Ok(if secret.has_field("generateImpure").await? {
+ let config_field = &config.config_unchecked_field;
+ let generate = nix_go!(secret.generateImpure);
+ let owners: Vec<String> = nix_go_json!(secret.expectedOwners);
- let secret =
- nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);
- let generate_impure = nix_go!(secret.generateImpure);
- let on = nix_go!(generate_impure.on);
- let call_package = nix_go!(
- config_field.buildableSystems(Obj {
- localSystem: { config.local_system.clone() }
- })[on]
- .config
- .nixpkgs
- .resolvedPkgs
- .callPackage
- );
- let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));
- let built = &generator.build().await?["out"];
- let mut nix = MyCommand::new("nix");
- let on: String = on.as_json().await?;
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{on}"))
- .arg(built);
- nix.run_nix().await?;
+ let on: String = nix_go_json!(generate.on);
+ let call_package = nix_go!(
+ config_field.buildableSystems(Obj {
+ localSystem: { config.local_system.clone() }
+ })[{ on }]
+ .config
+ .nixpkgs
+ .resolvedPkgs
+ .callPackage
+ );
- let session = config.host(&on).await?;
+ let host = config.host(&on).await?;
- let owners: Vec<String> = nix_go_json!(secret.expectedOwners);
- dbg!(&owners);
+ let generator = nix_go!(call_package(generate.generator)(Obj {}));
+ let generator = generator.build().await?;
+ let generator = generator
+ .get("out")
+ .ok_or_else(|| anyhow!("missing generateImpure out"))?;
+ let generator = host.remote_derivation(generator).await?;
- let mut recipients = String::new();
- for owner in owners {
- let key = config.key(&owner).await?;
- recipients.push_str(&format!("-r \"{key}\" "));
- }
- recipients.push_str("-e");
+ let mut recipients = String::new();
+ for owner in &owners {
+ let key = config.key(owner).await?;
+ recipients.push_str(&format!("-r \"{key}\" "));
+ }
+ recipients.push_str("-e");
- // FIXME: security: created directory might be accessible to other users
- // This shouldn't be much of a concern, as data is encrypted right after creation, yet
- // still better to have.
- let tempdir = session.mktemp_dir().await?;
+ let out = host.mktemp_dir().await?;
- let mut gen = session.cmd(built).await?;
- gen.env("rageArgs", recipients).env("out", &tempdir);
- gen.run().await?;
+ let mut gen = host.cmd(generator).await?;
+ gen.env("rageArgs", recipients).env("out", &out);
+ gen.run().await?;
- {
- let marker = session.read_file_text(format!("{tempdir}/marker")).await?;
- ensure!(marker == "SUCCESS", "generation not succeeded");
- }
+ {
+ let marker = host.read_file_text(format!("{out}/marker")).await?;
+ ensure!(marker == "SUCCESS", "generation not succeeded");
+ }
- let public = session
- .read_file_bin(format!("{tempdir}/public"))
- .await
- .ok();
- let secret = session
- .read_file_bin(format!("{tempdir}/secret"))
- .await
- .ok();
- if let Some(secret) = &secret {
- ensure!(
- age::Decryptor::new(Cursor::new(&secret)).is_ok(),
- "builder produced non-encrypted value as secret, this is highly insecure"
- );
- }
- dbg!(&secret);
- // // .as_json().await?;
- // dbg!(&built);
- }
+ let public = host.read_file_text(format!("{out}/public")).await.ok();
+ let secret = host.read_file_bin(format!("{out}/secret")).await.ok();
+ if let Some(secret) = &secret {
+ ensure!(
+ age::Decryptor::new(Cursor::new(&secret)).is_ok(),
+ "builder produced non-encrypted value as secret, this is highly insecure"
+ );
+ }
+
+ let created_at = host.read_file_value(format!("{out}/created_at")).await?;
+ let expires_at = host.read_file_value(format!("{out}/expires_at")).await.ok();
+
+ FleetSharedSecret {
+ owners,
+ secret: FleetSecret {
+ created_at,
+ expires_at,
+ public,
+ secret: secret.map(SecretData),
+ },
+ }
+ } else {
+ bail!("no generator defined for {display_name}")
+ })
+}
+
+async fn parse_public(
+ public: Option<String>,
+ public_file: Option<PathBuf>,
+) -> Result<Option<String>> {
+ Ok(match (public, public_file) {
+ (Some(v), None) => Some(v),
+ (None, Some(v)) => Some(read_to_string(v).await?),
+ (Some(_), Some(_)) => {
+ bail!("only public or public_file should be set")
+ }
+ (None, None) => None,
+ })
+}
+
+fn parse_machines(
+ initial: Vec<String>,
+ machines: Option<Vec<String>>,
+ mut add_machines: Vec<String>,
+ mut remove_machines: Vec<String>,
+) -> Result<Vec<String>> {
+ if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {
+ bail!("no operation");
+ }
+
+ let initial_machines = initial.clone();
+ let mut target_machines = initial;
+ info!("Currently encrypted for {initial_machines:?}");
+
+ // ensure!(machines.is_some() || !add_machines.is_empty() || )
+ if let Some(machines) = machines {
+ ensure!(
+ add_machines.is_empty() && remove_machines.is_empty(),
+ "can't combine --machines and --add-machines/--remove-machines"
+ );
+ let target = initial_machines.iter().collect::<HashSet<_>>();
+ let source = machines.iter().collect::<HashSet<_>>();
+ for removed in target.difference(&source) {
+ remove_machines.push((*removed).clone());
+ }
+ for added in source.difference(&target) {
+ add_machines.push((*added).clone());
+ }
+ }
+
+ for machine in &remove_machines {
+ let mut removed = false;
+ while let Some(pos) = target_machines.iter().position(|m| m == machine) {
+ target_machines.swap_remove(pos);
+ removed = true;
+ }
+ if !removed {
+ warn!("secret is not enabled for {machine}");
+ }
+ }
+ for machine in &add_machines {
+ if target_machines.iter().any(|m| m == machine) {
+ warn!("secret is already added to {machine}");
+ } else {
+ target_machines.push(machine.to_owned());
+ }
+ }
+ if !remove_machines.is_empty() {
+ // TODO: maybe force secret regeneration?
+ // Not that useful without revokation.
+ warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");
+ }
+ Ok(target_machines)
+}
+impl Secret {
+ pub async fn run(self, config: &Config) -> Result<()> {
+ match self {
Secret::ForceKeys => {
for host in config.list_hosts().await? {
if config.should_skip(&host.name) {
@@ -199,9 +270,8 @@
machines = shared.owners;
}
- let recipients = futures::stream::iter(machines.iter())
- .then(|m| config.recipient(m))
- .try_collect::<Vec<_>>()
+ let recipients = config
+ .recipients(&machines.iter().map(String::as_str).collect_vec())
.await?;
let secret = {
@@ -209,22 +279,15 @@
io::stdin().read_to_end(&mut input)?;
if input.is_empty() {
- input
+ None
} else {
- let mut encrypted = vec![];
- let recipients = recipients
- .iter()
- .cloned()
- .map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)
- .collect();
- let mut encryptor = age::Encryptor::with_recipients(recipients)
- .ok_or_else(|| anyhow!("no recipients provided"))?
- .wrap_output(&mut encrypted)?;
- io::copy(&mut Cursor::new(input), &mut encryptor)?;
- encryptor.finish()?;
- encrypted
+ Some(
+ SecretData::encrypt(recipients, input)
+ .ok_or_else(|| anyhow!("no recipients provided"))?,
+ )
}
};
+ let public = parse_public(public, public_file).await?;
config.replace_shared(
name,
FleetSharedSecret {
@@ -233,14 +296,7 @@
created_at: Utc::now(),
expires_at,
secret,
- public: match (public, public_file) {
- (Some(v), None) => Some(v),
- (None, Some(v)) => Some(read_to_string(v).await?),
- (Some(_), Some(_)) => {
- bail!("only public or public_file should be set")
- }
- (None, None) => None,
- },
+ public,
},
},
);
@@ -261,19 +317,14 @@
bail!("no data provided")
}
- let mut encrypted = vec![];
- let recipient = Box::new(recipient) as Box<dyn age::Recipient + Send>;
- let mut encryptor = age::Encryptor::with_recipients(vec![recipient])
- .expect("recipients provided")
- .wrap_output(&mut encrypted)?;
- io::copy(&mut Cursor::new(input), &mut encryptor)?;
- encryptor.finish()?;
- encrypted
+ Some(SecretData::encrypt(vec![recipient], input).expect("recipient provided"))
};
if config.has_secret(&machine, &name) && !force {
bail!("secret already defined");
}
+ let public = parse_public(public, public_file).await?;
+
config.insert_secret(
&machine,
name,
@@ -281,16 +332,10 @@
created_at: Utc::now(),
expires_at: None,
secret,
- public: match (public, public_file) {
- (Some(v), None) => Some(v),
- (None, Some(v)) => Some(std::fs::read_to_string(v)?),
- (Some(_), Some(_)) => bail!("only public or public_file should be set"),
- (None, None) => None,
- },
+ public,
},
);
}
- // TODO: Instead of using sudo, decode secret on remote machine
#[allow(clippy::await_holding_refcell_ref)]
Secret::Read {
name,
@@ -298,11 +343,11 @@
plaintext,
} => {
let secret = config.host_secret(&machine, &name)?;
- if secret.secret.is_empty() {
+ let Some(secret) = secret.secret else {
bail!("no secret {name}");
- }
+ };
let host = config.host(&machine).await?;
- let data = host.decrypt(secret.secret).await?;
+ let data = host.decrypt(secret).await?;
if plaintext {
let s = String::from_utf8(data).context("output is not utf8")?;
print!("{s}");
@@ -313,59 +358,22 @@
Secret::UpdateShared {
name,
machines,
- mut add_machines,
- mut remove_machines,
+ add_machines,
+ remove_machines,
prefer_identities,
} => {
- if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {
- bail!("no operation");
- }
-
let mut secret = config.shared_secret(&name)?;
- if secret.secret.secret.is_empty() {
+ if secret.secret.secret.is_none() {
bail!("no secret");
}
let initial_machines = secret.owners.clone();
- let mut target_machines = secret.owners.clone();
- info!("Currently encrypted for {initial_machines:?}");
-
- // ensure!(machines.is_some() || !add_machines.is_empty() || )
- if let Some(machines) = machines {
- ensure!(
- add_machines.is_empty() && remove_machines.is_empty(),
- "can't combine --machines and --add-machines/--remove-machines"
- );
- let target = initial_machines.iter().collect::<HashSet<_>>();
- let source = machines.iter().collect::<HashSet<_>>();
- for removed in target.difference(&source) {
- remove_machines.push((*removed).clone());
- }
- for added in source.difference(&target) {
- add_machines.push((*added).clone());
- }
- }
-
- for machine in &remove_machines {
- let mut removed = false;
- while let Some(pos) = target_machines.iter().position(|m| m == machine) {
- target_machines.swap_remove(pos);
- removed = true;
- }
- if !removed {
- warn!("secret is not enabled for {machine}");
- }
- }
- for machine in &add_machines {
- if target_machines.iter().any(|m| m == machine) {
- warn!("secret is already added to {machine}");
- } else {
- target_machines.push(machine.to_owned());
- }
- }
- if !remove_machines.is_empty() {
- warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");
- }
+ let target_machines = parse_machines(
+ initial_machines.clone(),
+ machines,
+ add_machines,
+ remove_machines,
+ )?;
if target_machines.is_empty() {
info!("no machines left for secret, removing it");
@@ -395,12 +403,14 @@
let target_recipients =
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret.secret.secret, target_recipients)
- .await?;
+ if let Some(data) = secret.secret.secret {
+ let encrypted = config
+ .reencrypt_on_host(identity_holder, data, target_recipients)
+ .await?;
+ secret.secret.secret = Some(encrypted);
+ }
secret.owners = target_machines;
- secret.secret.secret = encrypted;
config.replace_shared(name, secret);
}
Secret::Regenerate { prefer_identities } => {
@@ -412,14 +422,20 @@
.collect::<HashSet<_>>();
let shared_set = config.list_shared().into_iter().collect::<HashSet<_>>();
for removed in expected_shared_set.difference(&shared_set) {
- error!("secret needs to be generated: {removed}")
+ info!("generating secret: {removed}");
+ let config_field = &config.config_unchecked_field;
+ let config_field = nix_go!(config_field.configUnchecked);
+ let secret = nix_go!(config_field.sharedSecrets[{ removed }]);
+ let shared = generate_shared(config, removed, secret).await?;
+ config.replace_shared(removed.to_string(), shared)
}
}
let mut to_remove = Vec::new();
for name in &config.list_shared() {
info!("updating secret: {name}");
let mut data = config.shared_secret(name)?;
- let config_field = &config.config_field;
+ let config_field = &config.config_unchecked_field;
+ let config_field = nix_go!(config_field.configUnchecked);
let expected_owners: Vec<String> =
nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);
if expected_owners.is_empty() {
@@ -430,50 +446,52 @@
let set = data.owners.iter().collect::<HashSet<_>>();
let expected_set = expected_owners.iter().collect::<HashSet<_>>();
let should_remove = set.difference(&expected_set).next().is_some();
- if set != expected_set {
- let owner_dependent: bool =
- nix_go_json!(config_field.sharedSecrets[{ name }].ownerDependent);
- if !owner_dependent {
- warn!("reencrypting secret '{name}' for new owner set");
- // TODO: force regeneration
- if should_remove {
- warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");
- }
+ if set == expected_set {
+ info!("secret data is ok");
+ continue;
+ }
+
+ let secret = nix_go!(config_field.sharedSecrets[{ name }]);
+ let owner_dependent: bool = nix_go_json!(secret.ownerDependent);
+ let regenerate_on_remove: bool = nix_go_json!(secret.regenerateOnOwnerRemoved);
+ #[allow(clippy::nonminimal_bool)]
+ if !owner_dependent && !(should_remove && regenerate_on_remove) {
+ warn!("reencrypting secret '{name}' for new owner set");
+ // TODO: force regeneration
+ if should_remove {
+ warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");
+ }
- let identity_holder = if !prefer_identities.is_empty() {
- prefer_identities
- .iter()
- .find(|i| data.owners.iter().any(|s| s == *i))
- } else {
- data.owners.first()
- };
- let Some(identity_holder) = identity_holder else {
- bail!("no available holder found");
- };
+ let identity_holder = if !prefer_identities.is_empty() {
+ prefer_identities
+ .iter()
+ .find(|i| data.owners.iter().any(|s| s == *i))
+ } else {
+ data.owners.first()
+ };
+ let Some(identity_holder) = identity_holder else {
+ bail!("no available holder found");
+ };
- let target_recipients = futures::stream::iter(&expected_owners)
- .then(|m| async { config.key(m).await })
- .collect::<Vec<_>>()
- .await;
- let target_recipients =
- target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
+ let target_recipients = futures::stream::iter(&expected_owners)
+ .then(|m| async { config.key(m).await })
+ .collect::<Vec<_>>()
+ .await;
+ let target_recipients =
+ target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
+ if let Some(secret) = data.secret.secret {
let encrypted = config
- .reencrypt_on_host(
- identity_holder,
- data.secret.secret,
- target_recipients,
- )
+ .reencrypt_on_host(identity_holder, secret, target_recipients)
.await?;
- data.secret.secret = encrypted;
- data.owners = expected_owners;
- config.replace_shared(name.to_owned(), data);
- } else {
- error!("secret '{name}' should be regenerated manually");
+ data.secret.secret = Some(encrypted);
}
+ data.owners = expected_owners;
+ config.replace_shared(name.to_owned(), data);
} else {
- info!("secret data is ok")
+ let shared = generate_shared(config, name, secret).await?;
+ config.replace_shared(name.to_owned(), shared)
}
}
for k in to_remove {
cmds/fleet/src/fleetdata.rsdiffbeforeafterboth--- a/cmds/fleet/src/fleetdata.rs
+++ b/cmds/fleet/src/fleetdata.rs
@@ -1,8 +1,13 @@
+use age::Recipient;
use anyhow::Result;
use chrono::{DateTime, Utc};
+use itertools::Itertools;
use nixlike::format_nix;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use std::collections::BTreeMap;
+use std::{
+ collections::BTreeMap,
+ io::{self, Cursor},
+};
use tempfile::TempDir;
use tokio::{
fs::{self, File},
@@ -41,6 +46,43 @@
}
#[derive(Serialize, Deserialize, Clone)]
+pub struct SecretData(
+ #[serde(
+ default,
+ skip_serializing_if = "Vec::is_empty",
+ serialize_with = "as_z85",
+ deserialize_with = "from_z85"
+ )]
+ pub Vec<u8>,
+);
+impl SecretData {
+ /// Returns None if recipients.is_empty()
+ pub fn encrypt(
+ recipients: impl IntoIterator<Item = impl Recipient + Send + 'static>,
+ data: Vec<u8>,
+ ) -> Option<Self> {
+ let mut encrypted = vec![];
+ let recipients = recipients
+ .into_iter()
+ .map(|v| Box::new(v) as Box<dyn Recipient + Send>)
+ .collect_vec();
+ let mut encryptor = age::Encryptor::with_recipients(recipients)?
+ .wrap_output(&mut encrypted)
+ .expect("in memory write");
+ io::copy(&mut Cursor::new(data), &mut encryptor).expect("in memory copy");
+ encryptor.finish().expect("in memory flush");
+ Some(Self(encrypted))
+ }
+ pub fn encode_z85(&self) -> String {
+ z85::encode(&self.0)
+ }
+ pub fn decode_z85(v: &str) -> Result<Self> {
+ let v = z85::decode(v)?;
+ Ok(Self(v))
+ }
+}
+
+#[derive(Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
#[must_use]
pub struct FleetSecret {
@@ -51,13 +93,8 @@
pub expires_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub public: Option<String>,
- #[serde(
- default,
- skip_serializing_if = "Vec::is_empty",
- serialize_with = "as_z85",
- deserialize_with = "from_z85"
- )]
- pub secret: Vec<u8>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub secret: Option<SecretData>,
}
fn as_z85<S>(key: &[u8], serializer: S) -> Result<S::Ok, S::Error>
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,21 +1,25 @@
use std::{
env::current_dir,
ffi::{OsStr, OsString},
+ fmt::Display,
io::Write,
ops::Deref,
path::PathBuf,
+ str::FromStr,
sync::{Arc, Mutex, MutexGuard, OnceLock},
};
+use age::Recipient;
use anyhow::{anyhow, bail, Context, Result};
use clap::{ArgGroup, Parser};
use openssh::SessionBuilder;
+use serde::de::DeserializeOwned;
use tempfile::NamedTempFile;
use crate::{
better_nix_eval::{Field, NixSessionPool},
command::MyCommand,
- fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
+ fleetdata::{FleetData, FleetSecret, FleetSharedSecret, SecretData},
nix_go, nix_go_json,
};
@@ -80,14 +84,25 @@
cmd.arg(path);
cmd.run_string().await
}
+ pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {
+ let text = self.read_file_text(path).await?;
+ Ok(serde_json::from_str(&text)?)
+ }
+ pub async fn read_file_value<D: FromStr>(&self, path: impl AsRef<OsStr>) -> Result<D>
+ where
+ <D as FromStr>::Err: Display,
+ {
+ let text = self.read_file_text(path).await?;
+ D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
+ }
pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
let session = self.open_session().await?;
Ok(MyCommand::new_on(cmd, session))
}
- pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
+ pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
let mut cmd = self.cmd("fleet-install-secrets").await?;
- cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));
+ cmd.arg("decrypt").eqarg("--secret", data.encode_z85());
let encoded = cmd
.sudo()
.run_string()
@@ -95,6 +110,16 @@
.context("failed to call remote host for decrypt")?;
z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
}
+ /// Returns path for futureproofing, as path might change i.e on conversion to CA
+ pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+ let mut nix = MyCommand::new("nix");
+ nix.arg("copy")
+ .arg("--substitute-on-destination")
+ .comparg("--to", format!("ssh-ng://{}", self.name))
+ .arg(path);
+ nix.run_nix().await?;
+ Ok(path.to_owned())
+ }
}
impl Config {
@@ -166,8 +191,10 @@
}
/// Shared secrets configured in fleet.nix or in flake
pub async fn list_configured_shared(&self) -> Result<Vec<String>> {
- let config_field = &self.config_field;
- nix_go!(config_field.sharedSecrets).list_fields().await
+ let config_field = &self.config_unchecked_field;
+ nix_go!(config_field.configUnchecked.sharedSecrets)
+ .list_fields()
+ .await
}
/// Shared secrets configured in fleet.nix
pub fn list_shared(&self) -> Vec<String> {
@@ -203,12 +230,11 @@
pub async fn reencrypt_on_host(
&self,
host: &str,
- data: Vec<u8>,
+ data: SecretData,
targets: Vec<String>,
- ) -> Result<Vec<u8>> {
- let data = z85::encode(&data);
+ ) -> Result<SecretData> {
let mut recmd = MyCommand::new("fleet-install-secrets");
- recmd.arg("reencrypt").eqarg("--secret", data);
+ recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
for target in targets {
recmd.eqarg("--targets", target);
}
@@ -219,7 +245,7 @@
.context("failed to call remote host for decrypt")?
.trim()
.to_owned();
- z85::decode(encoded).context("bad encoded data? outdated host?")
+ SecretData::decode_z85(&encoded)
}
pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
@@ -240,9 +266,9 @@
Ok(secret.clone())
}
pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {
- let config_field = &self.config_field;
+ let config_field = &self.config_unchecked_field;
Ok(nix_go_json!(
- config_field.sharedSecrets[{ secret }].expectedOwners
+ config_field.configUnchecked.sharedSecrets[{ secret }].expectedOwners
))
}
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -2,7 +2,9 @@
use crate::command::MyCommand;
use crate::host::Config;
+use age::Recipient;
use anyhow::{anyhow, Result};
+use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use tracing::warn;
@@ -36,11 +38,18 @@
}
}
/// Insecure, requires root
- pub async fn recipient(&self, host: &str) -> anyhow::Result<age::ssh::Recipient> {
+ pub async fn recipient(&self, host: &str) -> anyhow::Result<impl Recipient> {
let key = self.key(host).await?;
age::ssh::Recipient::from_str(&key).map_err(|e| anyhow!("parse recipient error: {:?}", e))
}
+ pub async fn recipients(&self, hosts: &[&str]) -> Result<Vec<impl Recipient>> {
+ futures::stream::iter(hosts.iter())
+ .then(|m| self.recipient(m))
+ .try_collect::<Vec<_>>()
+ .await
+ }
+
#[allow(dead_code)]
pub async fn orphaned_data(&self) -> Result<Vec<String>> {
let mut out = Vec::new();
crates/nixlike/src/lib.rsdiffbeforeafterboth--- a/crates/nixlike/src/lib.rs
+++ b/crates/nixlike/src/lib.rs
@@ -10,6 +10,8 @@
mod se_impl;
mod to_string;
+pub use to_string::escape_string;
+
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("bad number")]
crates/nixlike/src/to_string.rsdiffbeforeafterboth--- a/crates/nixlike/src/to_string.rs
+++ b/crates/nixlike/src/to_string.rs
@@ -25,8 +25,8 @@
}
}
-fn write_nix_str(str: &str, out: &mut String) {
- out.push_str(&format!(
+pub fn escape_string(str: &str) -> String {
+ format!(
"\"{}\"",
str.replace('\\', "\\\\")
.replace('"', "\\\"")
@@ -34,7 +34,11 @@
.replace('\t', "\\t")
.replace('\r', "\\r")
.replace('$', "\\$")
- ))
+ )
+}
+
+pub fn write_nix_str(str: &str, out: &mut String) {
+ out.push_str(&escape_string(str))
}
fn write_nix_buf(value: &Value, out: &mut String) {