git.delta.rocks / jrsonnet / refs/commits / a369041a95eb

difftreelog

refactor shell abstraction

Yaroslav Bolyukin2023-12-28parent: #7e2e5c5.patch.diff
in: trunk

6 files changed

modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
before · cmds/fleet/src/better_nix_eval.rs
1use std::collections::HashMap;2use std::ffi::{OsStr, OsString};3use std::fmt::{self, Display};4use std::path::PathBuf;5use std::process::Stdio;6use std::sync::{Arc, OnceLock};78use anyhow::{anyhow, bail, ensure, Context, Result};9use futures::StreamExt;10use itertools::Itertools;11use r2d2::{Pool, PooledConnection};12use serde::de::DeserializeOwned;13use serde::{Deserialize, Serialize};14use tokio::io::AsyncWriteExt;15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};16use tokio::select;17use tokio::sync::{mpsc, oneshot};18use tokio_util::codec::{FramedRead, LinesCodec};19use tracing::{debug, error, warn, Level};2021use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};2223const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2425pub struct NixSessionInner {26	full_delimiter: String,27	nix_handler: ClonableHandler<NixHandler>,28	out: OutputHandler,29	stdin: ChildStdin,30	string_wrapping: (String, String),31	number_wrapping: (String, String),3233	next_id: u32,34	free_list: Vec<u32>,35}36const TRAIN_STRING: &str = "\"TRAIN_STRING\"";37const TRAIN_NUMBER: &str = "13141516";3839#[must_use]40struct ErrorCollector<'i, H> {41	collected: Vec<String>,42	inner: &'i mut H,43}44impl<'i, H> ErrorCollector<'i, H> {45	fn new(inner: &'i mut H) -> Self {46		Self {47			collected: vec![],48			inner,49		}50	}51}52impl<H> ErrorCollector<'_, H> {53	fn handle_line_inner(&mut self, msg: &str) -> bool {54		let Some(msg) = msg.strip_prefix("@nix ") else {55			return false;56		};57		#[derive(Deserialize)]58		struct ErrorAction {59			action: String,60			level: u32,61			msg: String,62		}63		let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {64			return false;65		};66		if act.action != "msg" || act.level != 0 {67			return false;68		}69		self.collected.push(act.msg);70		true71	}72	fn finish(self) -> Result<()> {73		// fn dedent(s: String) -> String {74		// 	s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)75		// }76		if !self.collected.is_empty() {77			bail!(78				"{}",79				self.collected80					.iter()81					.map(|v| {82						if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {83							let v = unindent::unindent(f.trim_start());84							v.trim().to_owned()85						} else {86							v.to_owned()87						}88					})89					.join("\n")90			);91		}92		Ok(())93	}94	fn flush(self) {95		for line in self.collected {96			warn!("{line}");97		}98	}99}100impl<H: Handler> Handler for ErrorCollector<'_, H> {101	fn handle_line(&mut self, e: &str) {102		if self.handle_line_inner(e) {103			return;104		}105		self.inner.handle_line(e)106	}107}108109enum OutputLine {110	Out(String),111	Err(String),112}113struct OutputHandler {114	rx: mpsc::Receiver<OutputLine>,115	_cancel_handle: oneshot::Receiver<()>,116}117impl OutputHandler {118	fn new(out: ChildStdout, err: ChildStderr) -> Self {119		let mut out = FramedRead::new(out, LinesCodec::new());120		let mut err = FramedRead::new(err, LinesCodec::new());121		let (tx, rx) = mpsc::channel(20);122		let (mut cancelled, _cancel_handle) = oneshot::channel();123		tokio::spawn(async move {124			loop {125				select! {126					// We should receive errors earlier than synchronization127					biased;128					e = err.next() => {129						let Some(Ok(e)) = e else {130							if e.is_some() {131								error!("bad repl stderr: {e:?}");132							}133							continue;134						};135						let _ = tx.send(OutputLine::Err(e)).await;136					}137					o = out.next() => {138						let Some(Ok(o)) = o else {139							if o.is_some() {140								error!("bad repl stdout: {o:?}");141							}142							continue;143						};144						let _ = tx.send(OutputLine::Out(o)).await;145					}146					// Reader doesn't care about stdout, as this is cancelled.147					// Error still might be useful, to process leftover span closures?148					_ = cancelled.closed() => {149						break;150					}151				}152			}153		});154		Self { rx, _cancel_handle }155	}156	async fn next(&mut self) -> Option<OutputLine> {157		self.rx.recv().await158	}159}160161struct WarnHandler;162impl Handler for WarnHandler {163	fn handle_line(&mut self, e: &str) {164		warn!(target: "nix", "{e}")165	}166}167168impl NixSessionInner {169	async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {170		let mut cmd = Command::new("nix");171		cmd.arg("repl")172			.arg(flake)173			.arg("--log-format")174			.arg("internal-json");175		for arg in extra_args {176			cmd.arg(arg);177		}178		cmd.stdin(Stdio::piped());179		cmd.stdout(Stdio::piped());180		cmd.stderr(Stdio::piped());181		let cmd = cmd.spawn()?;182		let stdout = cmd.stdout.unwrap();183		let stderr = cmd.stderr.unwrap();184		let mut out = OutputHandler::new(stdout, stderr);185		let mut stdin = cmd.stdin.unwrap();186		// Standard repl hello doesn't work with internal-json logger187		stdin.write_all(REPL_DELIMITER.as_bytes()).await?;188		stdin.write_all(b"\n").await?;189		stdin.flush().await?;190		let nix_handler = NixHandler::default();191		let mut full_delimiter = None;192		let mut errors = vec![];193		while let Some(line) = out.next().await {194			let line = match line {195				OutputLine::Out(o) => o,196				OutputLine::Err(_e) => {197					// Handle startup errors, but skip repl hello?198					errors.push(_e);199					continue;200				}201			};202			if line.contains(REPL_DELIMITER) {203				debug!("discovered repl delimiter with added colors: {line}");204				full_delimiter = Some(line.to_owned());205				break;206			}207		}208		let Some(full_delimiter) = full_delimiter else {209			for e in errors {210				error!("{e}");211			}212			bail!("failed to discover delimiter");213		};214		let mut res = Self {215			full_delimiter,216			nix_handler: ClonableHandler::new(nix_handler),217			out,218			stdin,219			string_wrapping: Default::default(),220			number_wrapping: Default::default(),221222			next_id: 0,223			free_list: vec![],224		};225		res.train().await?;226		Ok(res)227	}228	async fn train(&mut self) -> Result<()> {229		{230			let full_string = self231				.execute_expression_raw(TRAIN_STRING, &mut NoopHandler)232				.await?;233			let string_offset = full_string.find(TRAIN_STRING).expect("contained");234			let string_prefix = &full_string[..string_offset];235			let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];236			self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());237		}238		{239			let full_number = self240				.execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)241				.await?;242			let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");243			let number_prefix = &full_number[..number_offset];244			let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];245			self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());246		}247		Ok(())248	}249	async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {250		if tracing::enabled!(Level::DEBUG) {251			let cmd_str = String::from_utf8_lossy(cmd.as_ref());252			tracing::debug!("{cmd_str}");253		};254		self.stdin.write_all(cmd.as_ref()).await?;255		self.stdin.write_all(b"\n").await?;256		Ok(())257	}258	async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {259		let mut out = String::new();260		while let Some(line) = self.out.next().await {261			let line = match line {262				OutputLine::Out(out) => out,263				OutputLine::Err(err) => {264					err_handler.handle_line(&err);265					continue;266				}267			};268			if line == self.full_delimiter {269				return Ok(out);270			}271			if !out.is_empty() {272				out.push('\n');273			}274			out.push_str(&line);275		}276		bail!("didn't reached delimiter");277	}278	async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {279		let num = self.number_wrapping.clone();280		let n = self.execute_expression_wrapping(expr, &num).await?;281		Ok(n.parse::<u64>()?)282	}283	async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {284		let num = self.string_wrapping.clone();285		let n = self.execute_expression_wrapping(expr, &num).await?;286		let str: String = serde_json::from_str(&n)?;287		Ok(str)288	}289	async fn execute_expression_to_json<V: DeserializeOwned>(290		&mut self,291		expr: impl AsRef<[u8]>,292	) -> Result<V> {293		let mut fexpr = b"builtins.toJSON (".to_vec();294		fexpr.extend_from_slice(expr.as_ref());295		fexpr.push(b')');296		let v = self.execute_expression_string(fexpr).await?;297		Ok(serde_json::from_str(&v)?)298	}299	async fn execute_expression_wrapping(300		&mut self,301		expr: impl AsRef<[u8]>,302		wrapping: &(String, String),303	) -> Result<String> {304		let mut nix_handler = self.nix_handler.clone();305		let mut collected = ErrorCollector::new(&mut nix_handler);306		let res = self.execute_expression_raw(expr, &mut collected).await?;307		if res.is_empty() {308			collected.finish()?;309			bail!("expected expression, got nothing")310		} else {311			collected.flush()312		};313		let Some(res) = res.strip_prefix(&wrapping.0) else {314			bail!("invalid type")315		};316		let Some(res) = res.strip_suffix(&wrapping.1) else {317			bail!("invalid type")318		};319		Ok(res.to_owned())320	}321	async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {322		let mut nix_handler = self.nix_handler.clone();323		let mut collected = ErrorCollector::new(&mut nix_handler);324		let v = self.execute_expression_raw(expr, &mut collected).await?;325		collected.finish()?;326		ensure!(v.is_empty(), "unexpected expression result");327		Ok(())328	}329	async fn execute_expression_raw(330		&mut self,331		expr: impl AsRef<[u8]>,332		err_handler: &mut dyn Handler,333	) -> Result<String> {334		self.send_command(expr).await?;335		// It will be echoed336		self.send_command(REPL_DELIMITER).await?;337		self.read_until_delimiter(err_handler).await338	}339	async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {340		let id = self.allocate_id();341		self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))342			.await?;343		Ok(id)344	}345346	/// Id should be immediately used347	fn allocate_id(&mut self) -> u32 {348		if let Some(free) = self.free_list.pop() {349			free350		} else {351			let v = self.next_id;352			self.next_id += 1;353			v354		}355	}356	// Nix has no way to deallocate variable, yet GC will correct everything not reachable.357	// async fn free_id(&mut self, id: u32) -> Result<()> {358	// 	self.execute_expression_empty(format!("sess_field_{id} = null"))359	// 		.await?;360	// 	self.free_list.push(id);361	// 	Ok(())362	// }363}364365#[derive(Clone)]366pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);367368#[derive(Clone)]369pub struct NixExprBuilder {370	out: String,371	used_fields: Vec<Field>,372}373impl NixExprBuilder {374	pub fn object() -> Self {375		NixExprBuilder {376			out: "{ ".to_owned(),377			used_fields: Vec::new(),378		}379	}380	pub fn string(s: &str) -> Self {381		NixExprBuilder {382			out: nixlike::serialize(s)383				.expect("no problems with serializing_string")384				.trim_end()385				.to_owned(),386			used_fields: Vec::new(),387		}388	}389	pub fn serialized(v: impl Serialize) -> Self {390		let serialized = nixlike::serialize(v).expect("invalid value for apply");391		Self {392			out: serialized.trim_end().to_owned(),393			used_fields: Vec::new(),394		}395	}396	pub fn field(f: Field) -> Self {397		Self {398			out: format!("sess_field_{}", f.0.value.expect("no value")),399			used_fields: vec![f],400		}401	}402	pub fn end_obj(&mut self) {403		self.out.push('}');404	}405	pub fn obj_key(&mut self, name: Self, value: Self) {406		self.out.push_str(r#""${"#);407		self.extend(name);408		self.out.push_str(r#"}" = "#);409		self.extend(value);410		self.out.push_str("; ");411	}412413	pub fn extend(&mut self, e: Self) {414		self.out.push_str(&e.out);415		self.used_fields.extend(e.used_fields);416	}417418	pub fn session(&self) -> NixSession {419		let mut session = None;420		for ele in &self.used_fields {421			if session.is_none() {422				session = Some(ele.0.session.clone());423				continue;424			}425			let session = &session.as_ref().expect("checked").0;426			let ele_sess = &ele.0.session.0;427			assert!(428				Arc::ptr_eq(session, ele_sess),429				"can't mix fields from different session"430			);431		}432		session.expect("expr without fields used")433	}434	pub fn index_attr(&mut self, s: &str) {435		let escaped = nixlike::serialize(s).expect("string");436		self.out.push('.');437		self.out.push_str(escaped.trim_end());438	}439}440441#[macro_export]442macro_rules! nix_expr_inner {443	(Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{444		use $crate::better_nix_eval::NixExprBuilder;445		let mut out = NixExprBuilder::object();446		$(447			out.obj_key(448				NixExprBuilder::string(stringify!($ident)),449				$crate::nix_expr_inner!($($val)+),450			);451		)*452		out.end_obj();453		out454	}};455	(@field($o:ident) . $var:ident $($tt:tt)*) => {{456		$o.index_attr(stringify!($var));457		nix_expr_inner!(@field($o) $($tt)*);458	}};459	(@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{460		$o.push(Index::attr(&$v));461		nix_expr_inner!(@o($o) $($tt)*);462	}};463	(@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{464		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));465		nix_expr_inner!(@o($o) $($tt)*);466	}};467	(@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {468		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));469		nix_expr_inner!(@o($o) $($tt)*);470	};471	(@field($o:ident)) => {};472	($field:ident $($tt:tt)*) => {{473		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};474		#[allow(unused_mut, reason = "might be used if indexed")]475		let mut out = NixExprBuilder::field($field);476		nix_expr_inner!(@field(out) $($tt)*);477		out478	}};479	($v:literal) => {{480		use $crate::better_nix_eval::NixExprBuilder;481		NixExprBuilder::string($v)482	}};483	({$v:expr}) => {{484		use $crate::better_nix_eval::NixExprBuilder;485		NixExprBuilder::serialized(&$v)486	}}487}488#[macro_export]489macro_rules! nix_expr {490	($($tt:tt)+) => {{491		use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};492		let expr = nix_expr_inner!($($tt)+);493		Field::new(expr.session(), expr.out)494	}};495}496497#[macro_export]498macro_rules! nix_go {499	(@o($o:ident) . $var:ident $($tt:tt)*) => {{500		$o.push(Index::attr(stringify!($var)));501		nix_go!(@o($o) $($tt)*);502	}};503	(@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{504		$o.push(Index::attr(&$v));505		nix_go!(@o($o) $($tt)*);506	}};507	(@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{508		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));509		nix_go!(@o($o) $($tt)*);510	}};511	(@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {512		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));513		nix_go!(@o($o) $($tt)*);514	};515	(@o($o:ident)) => {};516	($field:ident $($tt:tt)+) => {{517		use $crate::{nix_go, better_nix_eval::Index};518		let field = $field.clone();519		let mut out = vec![];520		nix_go!(@o(out) $($tt)*);521		field.select(out).await?522	}}523}524#[macro_export]525macro_rules! nix_go_json {526	($($tt:tt)*) => {{527		$crate::nix_go!($($tt)*).as_json().await?528	}};529}530531#[derive(Clone)]532pub enum Index {533	Var(String),534	String(String),535	Apply(String),536	Expr(NixExprBuilder),537	ExprApply(NixExprBuilder),538}539impl Index {540	pub fn var(v: impl AsRef<str>) -> Self {541		let v = v.as_ref();542		assert!(543			!(v.contains('.') | v.contains(' ')),544			"bad variable name: {v}"545		);546		Self::Var(v.to_owned())547	}548	pub fn attr(v: impl AsRef<str>) -> Self {549		Self::String(v.as_ref().to_owned())550	}551	pub fn apply(v: impl Serialize) -> Self {552		let serialized = nixlike::serialize(v).expect("invalid value for apply");553		Self::Apply(serialized.trim_end().to_owned())554	}555}556impl Display for Index {557	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {558		match self {559			Index::Var(v) => {560				write!(f, "{v}")561			}562			Index::String(k) => {563				let v = nixlike::format_identifier(k.as_str());564				write!(f, ".{v}")565			}566			Index::Apply(o) => {567				write!(f, "<apply>({o})")568			}569			Index::Expr(e) => {570				write!(f, "[{}]", e.out)571			}572			Index::ExprApply(e) => {573				write!(f, "<apply>({})", e.out)574			}575		}576	}577}578impl fmt::Debug for Index {579	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {580		write!(f, "{self}")581	}582}583struct PathDisplay<'i>(&'i [Index]);584impl Display for PathDisplay<'_> {585	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {586		for i in self.0 {587			write!(f, "{i}")?;588		}589		Ok(())590	}591}592struct FieldInner {593	full_path: Option<Vec<Index>>,594	session: NixSession,595	value: Option<u32>,596}597fn context(full_path: Option<&[Index]>, query: &str) -> String {598	if let Some(full_path) = &full_path {599		format!("full path: {}", PathDisplay(full_path))600	} else {601		format!("query: {query:?}")602	}603}604#[derive(Clone)]605pub struct Field(Arc<FieldInner>);606impl Field {607	fn root(session: NixSession) -> Self {608		Self(Arc::new(FieldInner {609			full_path: Some(vec![]),610			session,611			value: None,612		}))613	}614	async fn new(session: NixSession, query: &str) -> Result<Self> {615		let vid = session616			.0617			.lock()618			.await619			.execute_assign(query)620			.await621			.with_context(|| context(None, query))?;622		Ok(Self(Arc::new(FieldInner {623			full_path: None,624			session,625			value: Some(vid),626		})))627	}628	pub async fn field(session: NixSession, field: &str) -> Result<Self> {629		Self::root(session).select([Index::var(field)]).await630	}631	pub async fn get_json_deep<'a, V: DeserializeOwned>(632		&self,633		name: impl IntoIterator<Item = Index>,634	) -> Result<V> {635		let field = self.select(name).await?;636		field.as_json().await637	}638	pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {639		let mut used_fields = Vec::new();640		let mut name = name.into_iter();641642		let mut full_path = self.0.full_path.clone();643		let mut query = if let Some(id) = self.0.value {644			format!("sess_field_{id}")645		} else {646			let first = name.next();647			if let Some(Index::Var(i)) = first {648				if let Some(full_path) = &mut full_path {649					full_path.push(Index::Var(i.clone()));650				}651				i.clone()652			} else {653				panic!("first path item should be variable, got {first:?}")654			}655		};656		for v in name {657			if let Some(full_path) = &mut full_path {658				full_path.push(v.clone());659			}660			match v {661				Index::Var(_) => panic!("var item may only be first"),662				Index::String(s) => {663					let escaped = nixlike::serialize(s)?;664					query.push('.');665					query.push_str(escaped.trim());666				}667				Index::Apply(a) => {668					// In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`669					query = format!("({query} {a})");670				}671				Index::Expr(e) => {672					let index = Field::new(self.0.session.clone(), &e.out).await?;673					used_fields.push(index.clone());674					query.push('.');675					let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));676					query.push_str(&index);677				}678				Index::ExprApply(e) => {679					let index = Field::new(self.0.session.clone(), &e.out).await?;680					used_fields.push(index.clone());681					query.push(' ');682					let index = format!("sess_field_{}", index.0.value.expect("value"));683					query.push_str(&index);684					query = format!("({query})");685				}686			}687		}688689		let vid = self690			.0691			.session692			.0693			.lock()694			.await695			.execute_assign(&query)696			.await697			.with_context(|| {698				if let Some(full_path) = &full_path {699					format!("full path: {}", PathDisplay(full_path))700				} else {701					format!("query: {query:?}")702				}703			})?;704		Ok(Self(Arc::new(FieldInner {705			full_path,706			session: self.0.session.clone(),707			value: Some(vid),708		})))709	}710	pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {711		let id = self.0.value.expect("can't serialize root field");712		let query = format!("sess_field_{id}");713		self.0714			.session715			.0716			.lock()717			.await718			.execute_expression_to_json(&query)719			.await720			.with_context(|| context(self.0.full_path.as_deref(), &query))721	}722	pub async fn list_fields(&self) -> Result<Vec<String>> {723		let id = self.0.value.expect("can't list root fields");724		let query = format!("builtins.attrNames sess_field_{id}");725		self.0726			.session727			.0728			.lock()729			.await730			.execute_expression_to_json(&query)731			.await732			.with_context(|| context(self.0.full_path.as_deref(), &query))733	}734	pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {735		let id = self.0.value.expect("can't use build on not-value");736		let query = format!(":b sess_field_{id}");737		let vid = self738			.0739			.session740			.0741			.lock()742			.await743			.execute_expression_raw(&query, &mut NixHandler::default())744			.await?;745		ensure!(746			!vid.is_empty(),747			"build failed: {}",748			context(self.0.full_path.as_deref(), &query),749		);750		let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")751		else {752			panic!("unexpected build output: {vid:?}");753		};754		let outputs = vid755			.split('\n')756			.filter(|v| !v.is_empty())757			.map(|v| v.split_once(" -> ").expect("unexpected build output"))758			.map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))759			.collect();760		Ok(outputs)761	}762}763impl Drop for FieldInner {764	fn drop(&mut self) {765		if let Some(id) = self.value {766			if let Ok(mut lock) = self.session.0.try_lock() {767				lock.free_list.push(id)768			}769			// Leaked770		}771	}772}773struct NixSessionPoolInner {774	flake: OsString,775	nix_args: Vec<OsString>,776}777778#[derive(Debug)]779pub struct NixPoolError(anyhow::Error);780impl From<anyhow::Error> for NixPoolError {781	fn from(value: anyhow::Error) -> Self {782		Self(value)783	}784}785impl std::error::Error for NixPoolError {}786impl std::fmt::Display for NixPoolError {787	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {788		self.0.fmt(f)789	}790}791impl r2d2::ManageConnection for NixSessionPoolInner {792	type Connection = NixSessionInner;793	type Error = NixPoolError;794	fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {795		let _v = TOKIO_RUNTIME796			.get()797			.expect("missed tokio runtime init!")798			.enter();799		Ok(futures::executor::block_on(NixSessionInner::new(800			self.flake.as_os_str(),801			self.nix_args.iter().map(OsString::as_os_str),802		))?)803	}804805	fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {806		let _v = TOKIO_RUNTIME807			.get()808			.expect("missed tokio runtime init!")809			.enter();810		let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;811		if res != 4 {812			return Err(anyhow!("sanity check failed").into());813		};814		Ok(())815	}816817	fn has_broken(&self, _conn: &mut Self::Connection) -> bool {818		false819	}820}821pub struct NixSessionPool(Pool<NixSessionPoolInner>);822impl NixSessionPool {823	pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {824		let inner = tokio::task::block_in_place(|| {825			r2d2::Builder::<NixSessionPoolInner>::new()826				.min_idle(Some(0))827				.build(NixSessionPoolInner { flake, nix_args })828		})?;829		Ok(Self(inner))830	}831	pub async fn get(&self) -> Result<NixSession> {832		let v = tokio::task::block_in_place(|| self.0.get())?;833		Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))834	}835}836837pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();
after · cmds/fleet/src/better_nix_eval.rs
1use std::collections::HashMap;2use std::ffi::{OsStr, OsString};3use std::fmt::{self, Display};4use std::path::PathBuf;5use std::process::Stdio;6use std::sync::{Arc, OnceLock};78use anyhow::{anyhow, bail, ensure, Context, Result};9use futures::StreamExt;10use itertools::Itertools;11use r2d2::{Pool, PooledConnection};12use serde::de::DeserializeOwned;13use serde::{Deserialize, Serialize};14use tokio::io::AsyncWriteExt;15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};16use tokio::select;17use tokio::sync::{mpsc, oneshot};18use tokio_util::codec::{FramedRead, LinesCodec};19use tracing::{debug, error, warn, Level};2021use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};2223const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2425pub struct NixSessionInner {26	full_delimiter: String,27	nix_handler: ClonableHandler<NixHandler>,28	out: OutputHandler,29	stdin: ChildStdin,30	string_wrapping: (String, String),31	number_wrapping: (String, String),3233	next_id: u32,34	free_list: Vec<u32>,35}36const TRAIN_STRING: &str = "\"TRAIN_STRING\"";37const TRAIN_NUMBER: &str = "13141516";3839#[must_use]40struct ErrorCollector<'i, H> {41	collected: Vec<String>,42	inner: &'i mut H,43}44impl<'i, H> ErrorCollector<'i, H> {45	fn new(inner: &'i mut H) -> Self {46		Self {47			collected: vec![],48			inner,49		}50	}51}52impl<H> ErrorCollector<'_, H> {53	fn handle_line_inner(&mut self, msg: &str) -> bool {54		let Some(msg) = msg.strip_prefix("@nix ") else {55			return false;56		};57		#[derive(Deserialize)]58		struct ErrorAction {59			action: String,60			level: u32,61			msg: String,62		}63		let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {64			return false;65		};66		if act.action != "msg" || act.level != 0 {67			return false;68		}69		self.collected.push(act.msg);70		true71	}72	fn finish(self) -> Result<()> {73		// fn dedent(s: String) -> String {74		// 	s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)75		// }76		if !self.collected.is_empty() {77			bail!(78				"{}",79				self.collected80					.iter()81					.map(|v| {82						if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {83							let v = unindent::unindent(f.trim_start());84							v.trim().to_owned()85						} else {86							v.to_owned()87						}88					})89					.join("\n")90			);91		}92		Ok(())93	}94	fn flush(self) {95		for line in self.collected {96			warn!("{line}");97		}98	}99}100impl<H: Handler> Handler for ErrorCollector<'_, H> {101	fn handle_line(&mut self, e: &str) {102		if self.handle_line_inner(e) {103			return;104		}105		self.inner.handle_line(e)106	}107}108109enum OutputLine {110	Out(String),111	Err(String),112}113struct OutputHandler {114	rx: mpsc::Receiver<OutputLine>,115	_cancel_handle: oneshot::Receiver<()>,116}117impl OutputHandler {118	fn new(out: ChildStdout, err: ChildStderr) -> Self {119		let mut out = FramedRead::new(out, LinesCodec::new());120		let mut err = FramedRead::new(err, LinesCodec::new());121		let (tx, rx) = mpsc::channel(20);122		let (mut cancelled, _cancel_handle) = oneshot::channel();123		tokio::spawn(async move {124			loop {125				select! {126					// We should receive errors earlier than synchronization127					biased;128					e = err.next() => {129						let Some(Ok(e)) = e else {130							if e.is_some() {131								error!("bad repl stderr: {e:?}");132							}133							continue;134						};135						let _ = tx.send(OutputLine::Err(e)).await;136					}137					o = out.next() => {138						let Some(Ok(o)) = o else {139							if o.is_some() {140								error!("bad repl stdout: {o:?}");141							}142							continue;143						};144						let _ = tx.send(OutputLine::Out(o)).await;145					}146					// Reader doesn't care about stdout, as this is cancelled.147					// Error still might be useful, to process leftover span closures?148					_ = cancelled.closed() => {149						break;150					}151				}152			}153		});154		Self { rx, _cancel_handle }155	}156	async fn next(&mut self) -> Option<OutputLine> {157		self.rx.recv().await158	}159}160161struct WarnHandler;162impl Handler for WarnHandler {163	fn handle_line(&mut self, e: &str) {164		warn!(target: "nix", "{e}")165	}166}167168impl NixSessionInner {169	async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {170		let mut cmd = Command::new("nix");171		cmd.arg("repl")172			.arg(flake)173			.arg("--log-format")174			.arg("internal-json");175		for arg in extra_args {176			cmd.arg(arg);177		}178		cmd.stdin(Stdio::piped());179		cmd.stdout(Stdio::piped());180		cmd.stderr(Stdio::piped());181		let cmd = cmd.spawn()?;182		let stdout = cmd.stdout.unwrap();183		let stderr = cmd.stderr.unwrap();184		let mut out = OutputHandler::new(stdout, stderr);185		let mut stdin = cmd.stdin.unwrap();186		// Standard repl hello doesn't work with internal-json logger187		stdin.write_all(REPL_DELIMITER.as_bytes()).await?;188		stdin.write_all(b"\n").await?;189		stdin.flush().await?;190		let nix_handler = NixHandler::default();191		let mut full_delimiter = None;192		let mut errors = vec![];193		while let Some(line) = out.next().await {194			let line = match line {195				OutputLine::Out(o) => o,196				OutputLine::Err(_e) => {197					// Handle startup errors, but skip repl hello?198					errors.push(_e);199					continue;200				}201			};202			if line.contains(REPL_DELIMITER) {203				debug!("discovered repl delimiter with added colors: {line}");204				full_delimiter = Some(line.to_owned());205				break;206			}207		}208		let Some(full_delimiter) = full_delimiter else {209			for e in errors {210				error!("{e}");211			}212			bail!("failed to discover delimiter");213		};214		let mut res = Self {215			full_delimiter,216			nix_handler: ClonableHandler::new(nix_handler),217			out,218			stdin,219			string_wrapping: Default::default(),220			number_wrapping: Default::default(),221222			next_id: 0,223			free_list: vec![],224		};225		res.train().await?;226		Ok(res)227	}228	async fn train(&mut self) -> Result<()> {229		{230			let full_string = self231				.execute_expression_raw(TRAIN_STRING, &mut NoopHandler)232				.await?;233			let string_offset = full_string.find(TRAIN_STRING).expect("contained");234			let string_prefix = &full_string[..string_offset];235			let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];236			self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());237		}238		{239			let full_number = self240				.execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)241				.await?;242			let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");243			let number_prefix = &full_number[..number_offset];244			let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];245			self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());246		}247		Ok(())248	}249	async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {250		if tracing::enabled!(Level::DEBUG) {251			let cmd_str = String::from_utf8_lossy(cmd.as_ref());252			tracing::debug!("{cmd_str}");253		};254		self.stdin.write_all(cmd.as_ref()).await?;255		self.stdin.write_all(b"\n").await?;256		Ok(())257	}258	async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {259		let mut out = String::new();260		while let Some(line) = self.out.next().await {261			let line = match line {262				OutputLine::Out(out) => out,263				OutputLine::Err(err) => {264					err_handler.handle_line(&err);265					continue;266				}267			};268			if line == self.full_delimiter {269				return Ok(out);270			}271			if !out.is_empty() {272				out.push('\n');273			}274			out.push_str(&line);275		}276		bail!("didn't reached delimiter");277	}278	async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {279		let num = self.number_wrapping.clone();280		let n = self.execute_expression_wrapping(expr, &num).await?;281		Ok(n.parse::<u64>()?)282	}283	async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {284		let num = self.string_wrapping.clone();285		let n = self.execute_expression_wrapping(expr, &num).await?;286		let str: String = serde_json::from_str(&n)?;287		Ok(str)288	}289	async fn execute_expression_to_json<V: DeserializeOwned>(290		&mut self,291		expr: impl AsRef<[u8]>,292	) -> Result<V> {293		let mut fexpr = b"builtins.toJSON (".to_vec();294		fexpr.extend_from_slice(expr.as_ref());295		fexpr.push(b')');296		let v = self.execute_expression_string(fexpr).await?;297		Ok(serde_json::from_str(&v)?)298	}299	async fn execute_expression_wrapping(300		&mut self,301		expr: impl AsRef<[u8]>,302		wrapping: &(String, String),303	) -> Result<String> {304		let mut nix_handler = self.nix_handler.clone();305		let mut collected = ErrorCollector::new(&mut nix_handler);306		let res = self.execute_expression_raw(expr, &mut collected).await?;307		if res.is_empty() {308			collected.finish()?;309			bail!("expected expression, got nothing")310		} else {311			collected.flush()312		};313		let Some(res) = res.strip_prefix(&wrapping.0) else {314			bail!("invalid type")315		};316		let Some(res) = res.strip_suffix(&wrapping.1) else {317			bail!("invalid type")318		};319		Ok(res.to_owned())320	}321	async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {322		let mut nix_handler = self.nix_handler.clone();323		let mut collected = ErrorCollector::new(&mut nix_handler);324		let v = self.execute_expression_raw(expr, &mut collected).await?;325		collected.finish()?;326		ensure!(v.is_empty(), "unexpected expression result");327		Ok(())328	}329	async fn execute_expression_raw(330		&mut self,331		expr: impl AsRef<[u8]>,332		err_handler: &mut dyn Handler,333	) -> Result<String> {334		self.send_command(expr).await?;335		// It will be echoed336		self.send_command(REPL_DELIMITER).await?;337		self.read_until_delimiter(err_handler).await338	}339	async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {340		let id = self.allocate_id();341		self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))342			.await?;343		Ok(id)344	}345346	/// Id should be immediately used347	fn allocate_id(&mut self) -> u32 {348		if let Some(free) = self.free_list.pop() {349			free350		} else {351			let v = self.next_id;352			self.next_id += 1;353			v354		}355	}356	// Nix has no way to deallocate variable, yet GC will correct everything not reachable.357	// async fn free_id(&mut self, id: u32) -> Result<()> {358	// 	self.execute_expression_empty(format!("sess_field_{id} = null"))359	// 		.await?;360	// 	self.free_list.push(id);361	// 	Ok(())362	// }363}364365#[derive(Clone)]366pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);367368#[derive(Clone)]369pub struct NixExprBuilder {370	out: String,371	used_fields: Vec<Field>,372}373impl NixExprBuilder {374	pub fn object() -> Self {375		NixExprBuilder {376			out: "{ ".to_owned(),377			used_fields: Vec::new(),378		}379	}380	pub fn string(s: &str) -> Self {381		NixExprBuilder {382			out: nixlike::serialize(s)383				.expect("no problems with serializing_string")384				.trim_end()385				.to_owned(),386			used_fields: Vec::new(),387		}388	}389	pub fn serialized(v: impl Serialize) -> Self {390		let serialized = nixlike::serialize(v).expect("invalid value for apply");391		Self {392			out: serialized.trim_end().to_owned(),393			used_fields: Vec::new(),394		}395	}396	pub fn field(f: Field) -> Self {397		Self {398			out: format!("sess_field_{}", f.0.value.expect("no value")),399			used_fields: vec![f],400		}401	}402	pub fn end_obj(&mut self) {403		self.out.push('}');404	}405	pub fn obj_key(&mut self, name: Self, value: Self) {406		self.out.push_str(r#""${"#);407		self.extend(name);408		self.out.push_str(r#"}" = "#);409		self.extend(value);410		self.out.push_str("; ");411	}412413	pub fn extend(&mut self, e: Self) {414		self.out.push_str(&e.out);415		self.used_fields.extend(e.used_fields);416	}417418	pub fn session(&self) -> NixSession {419		let mut session = None;420		for ele in &self.used_fields {421			if session.is_none() {422				session = Some(ele.0.session.clone());423				continue;424			}425			let session = &session.as_ref().expect("checked").0;426			let ele_sess = &ele.0.session.0;427			assert!(428				Arc::ptr_eq(session, ele_sess),429				"can't mix fields from different session"430			);431		}432		session.expect("expr without fields used")433	}434	pub fn index_attr(&mut self, s: &str) {435		let escaped = nixlike::serialize(s).expect("string");436		self.out.push('.');437		self.out.push_str(escaped.trim_end());438	}439}440441#[macro_export]442macro_rules! nix_expr_inner {443	(Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{444		use $crate::better_nix_eval::NixExprBuilder;445		let mut out = NixExprBuilder::object();446		$(447			out.obj_key(448				NixExprBuilder::string(stringify!($ident)),449				$crate::nix_expr_inner!($($val)+),450			);451		)*452		out.end_obj();453		out454	}};455	(@field($o:ident) . $var:ident $($tt:tt)*) => {{456		$o.index_attr(stringify!($var));457		nix_expr_inner!(@field($o) $($tt)*);458	}};459	(@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{460		$o.push(Index::attr(&$v));461		nix_expr_inner!(@o($o) $($tt)*);462	}};463	(@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{464		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));465		nix_expr_inner!(@o($o) $($tt)*);466	}};467	(@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {468		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));469		nix_expr_inner!(@o($o) $($tt)*);470	};471	(@field($o:ident)) => {};472	($field:ident $($tt:tt)*) => {{473		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};474		#[allow(unused_mut, reason = "might be used if indexed")]475		let mut out = NixExprBuilder::field($field.clone());476		nix_expr_inner!(@field(out) $($tt)*);477		out478	}};479	($v:literal) => {{480		use $crate::better_nix_eval::NixExprBuilder;481		NixExprBuilder::string($v)482	}};483	({$v:expr}) => {{484		use $crate::better_nix_eval::NixExprBuilder;485		NixExprBuilder::serialized(&$v)486	}}487}488#[macro_export]489macro_rules! nix_expr {490	($($tt:tt)+) => {{491		use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};492		let expr = nix_expr_inner!($($tt)+);493		Field::new(expr.session(), expr.out)494	}};495}496497#[macro_export]498macro_rules! nix_go {499	(@o($o:ident) . $var:ident $($tt:tt)*) => {{500		$o.push(Index::attr(stringify!($var)));501		nix_go!(@o($o) $($tt)*);502	}};503	(@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{504		$o.push(Index::attr(&$v));505		nix_go!(@o($o) $($tt)*);506	}};507	(@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{508		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));509		nix_go!(@o($o) $($tt)*);510	}};511	(@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {512		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));513		nix_go!(@o($o) $($tt)*);514	};515	(@o($o:ident)) => {};516	($field:ident $($tt:tt)+) => {{517		use $crate::{nix_go, better_nix_eval::Index};518		let field = $field.clone();519		let mut out = vec![];520		nix_go!(@o(out) $($tt)*);521		field.select(out).await?522	}}523}524#[macro_export]525macro_rules! nix_go_json {526	($($tt:tt)*) => {{527		$crate::nix_go!($($tt)*).as_json().await?528	}};529}530531#[derive(Clone)]532pub enum Index {533	Var(String),534	String(String),535	Apply(String),536	Expr(NixExprBuilder),537	ExprApply(NixExprBuilder),538}539impl Index {540	pub fn var(v: impl AsRef<str>) -> Self {541		let v = v.as_ref();542		assert!(543			!(v.contains('.') | v.contains(' ')),544			"bad variable name: {v}"545		);546		Self::Var(v.to_owned())547	}548	pub fn attr(v: impl AsRef<str>) -> Self {549		Self::String(v.as_ref().to_owned())550	}551	pub fn apply(v: impl Serialize) -> Self {552		let serialized = nixlike::serialize(v).expect("invalid value for apply");553		Self::Apply(serialized.trim_end().to_owned())554	}555}556impl Display for Index {557	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {558		match self {559			Index::Var(v) => {560				write!(f, "{v}")561			}562			Index::String(k) => {563				let v = nixlike::format_identifier(k.as_str());564				write!(f, ".{v}")565			}566			Index::Apply(o) => {567				write!(f, "<apply>({o})")568			}569			Index::Expr(e) => {570				write!(f, "[{}]", e.out)571			}572			Index::ExprApply(e) => {573				write!(f, "<apply>({})", e.out)574			}575		}576	}577}578impl fmt::Debug for Index {579	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {580		write!(f, "{self}")581	}582}583struct PathDisplay<'i>(&'i [Index]);584impl Display for PathDisplay<'_> {585	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {586		for i in self.0 {587			write!(f, "{i}")?;588		}589		Ok(())590	}591}592struct FieldInner {593	full_path: Option<Vec<Index>>,594	session: NixSession,595	value: Option<u32>,596}597fn context(full_path: Option<&[Index]>, query: &str) -> String {598	if let Some(full_path) = &full_path {599		format!("full path: {}", PathDisplay(full_path))600	} else {601		format!("query: {query:?}")602	}603}604#[derive(Clone)]605pub struct Field(Arc<FieldInner>);606impl Field {607	fn root(session: NixSession) -> Self {608		Self(Arc::new(FieldInner {609			full_path: Some(vec![]),610			session,611			value: None,612		}))613	}614	async fn new(session: NixSession, query: &str) -> Result<Self> {615		let vid = session616			.0617			.lock()618			.await619			.execute_assign(query)620			.await621			.with_context(|| context(None, query))?;622		Ok(Self(Arc::new(FieldInner {623			full_path: None,624			session,625			value: Some(vid),626		})))627	}628	pub async fn field(session: NixSession, field: &str) -> Result<Self> {629		Self::root(session).select([Index::var(field)]).await630	}631	pub async fn get_json_deep<'a, V: DeserializeOwned>(632		&self,633		name: impl IntoIterator<Item = Index>,634	) -> Result<V> {635		let field = self.select(name).await?;636		field.as_json().await637	}638	pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {639		let mut used_fields = Vec::new();640		let mut name = name.into_iter();641642		let mut full_path = self.0.full_path.clone();643		let mut query = if let Some(id) = self.0.value {644			format!("sess_field_{id}")645		} else {646			let first = name.next();647			if let Some(Index::Var(i)) = first {648				if let Some(full_path) = &mut full_path {649					full_path.push(Index::Var(i.clone()));650				}651				i.clone()652			} else {653				panic!("first path item should be variable, got {first:?}")654			}655		};656		for v in name {657			if let Some(full_path) = &mut full_path {658				full_path.push(v.clone());659			}660			match v {661				Index::Var(_) => panic!("var item may only be first"),662				Index::String(s) => {663					let escaped = nixlike::serialize(s)?;664					query.push('.');665					query.push_str(escaped.trim());666				}667				Index::Apply(a) => {668					// In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`669					query = format!("({query} {a})");670				}671				Index::Expr(e) => {672					let index = Field::new(self.0.session.clone(), &e.out).await?;673					used_fields.push(index.clone());674					query.push('.');675					let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));676					query.push_str(&index);677				}678				Index::ExprApply(e) => {679					let index = Field::new(self.0.session.clone(), &e.out).await?;680					used_fields.push(index.clone());681					query.push(' ');682					let index = format!("sess_field_{}", index.0.value.expect("value"));683					query.push_str(&index);684					query = format!("({query})");685				}686			}687		}688689		let vid = self690			.0691			.session692			.0693			.lock()694			.await695			.execute_assign(&query)696			.await697			.with_context(|| {698				if let Some(full_path) = &full_path {699					format!("full path: {}", PathDisplay(full_path))700				} else {701					format!("query: {query:?}")702				}703			})?;704		Ok(Self(Arc::new(FieldInner {705			full_path,706			session: self.0.session.clone(),707			value: Some(vid),708		})))709	}710	pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {711		let id = self.0.value.expect("can't serialize root field");712		let query = format!("sess_field_{id}");713		self.0714			.session715			.0716			.lock()717			.await718			.execute_expression_to_json(&query)719			.await720			.with_context(|| context(self.0.full_path.as_deref(), &query))721	}722	pub async fn list_fields(&self) -> Result<Vec<String>> {723		let id = self.0.value.expect("can't list root fields");724		let query = format!("builtins.attrNames sess_field_{id}");725		self.0726			.session727			.0728			.lock()729			.await730			.execute_expression_to_json(&query)731			.await732			.with_context(|| context(self.0.full_path.as_deref(), &query))733	}734	pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {735		let id = self.0.value.expect("can't use build on not-value");736		let query = format!(":b sess_field_{id}");737		let vid = self738			.0739			.session740			.0741			.lock()742			.await743			.execute_expression_raw(&query, &mut NixHandler::default())744			.await?;745		ensure!(746			!vid.is_empty(),747			"build failed: {}",748			context(self.0.full_path.as_deref(), &query),749		);750		let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")751		else {752			panic!("unexpected build output: {vid:?}");753		};754		let outputs = vid755			.split('\n')756			.filter(|v| !v.is_empty())757			.map(|v| v.split_once(" -> ").expect("unexpected build output"))758			.map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))759			.collect();760		Ok(outputs)761	}762}763impl Drop for FieldInner {764	fn drop(&mut self) {765		if let Some(id) = self.value {766			if let Ok(mut lock) = self.session.0.try_lock() {767				lock.free_list.push(id)768			}769			// Leaked770		}771	}772}773struct NixSessionPoolInner {774	flake: OsString,775	nix_args: Vec<OsString>,776}777778#[derive(Debug)]779pub struct NixPoolError(anyhow::Error);780impl From<anyhow::Error> for NixPoolError {781	fn from(value: anyhow::Error) -> Self {782		Self(value)783	}784}785impl std::error::Error for NixPoolError {}786impl std::fmt::Display for NixPoolError {787	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {788		self.0.fmt(f)789	}790}791impl r2d2::ManageConnection for NixSessionPoolInner {792	type Connection = NixSessionInner;793	type Error = NixPoolError;794	fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {795		let _v = TOKIO_RUNTIME796			.get()797			.expect("missed tokio runtime init!")798			.enter();799		Ok(futures::executor::block_on(NixSessionInner::new(800			self.flake.as_os_str(),801			self.nix_args.iter().map(OsString::as_os_str),802		))?)803	}804805	fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {806		let _v = TOKIO_RUNTIME807			.get()808			.expect("missed tokio runtime init!")809			.enter();810		let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;811		if res != 4 {812			return Err(anyhow!("sanity check failed").into());813		};814		Ok(())815	}816817	fn has_broken(&self, _conn: &mut Self::Connection) -> bool {818		false819	}820}821pub struct NixSessionPool(Pool<NixSessionPoolInner>);822impl NixSessionPool {823	pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {824		let inner = tokio::task::block_in_place(|| {825			r2d2::Builder::<NixSessionPoolInner>::new()826				.min_idle(Some(0))827				.build(NixSessionPoolInner { flake, nix_args })828		})?;829		Ok(Self(inner))830	}831	pub async fn get(&self) -> Result<NixSession> {832		let v = tokio::task::block_in_place(|| self.0.get())?;833		Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))834	}835}836837pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -291,9 +291,11 @@
 		info!("building");
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
-		let drv = nix_go!(fleet_field.buildSystems(Obj {
-			localSystem: { config.local_system.clone() }
-		}));
+		let drv = nix_go!(
+			fleet_field.buildSystems(Obj {
+				localSystem: { config.local_system.clone() }
+			})[{ action.build_attr() }][{ host }]
+		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
 				info!("sd-image build failed");
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,4 +1,5 @@
 use crate::{
+	command::MyCommand,
 	fleetdata::{FleetSecret, FleetSharedSecret},
 	host::Config,
 	nix_go, nix_go_json,
@@ -12,6 +13,7 @@
 	collections::HashSet,
 	io::{self, Cursor, Read},
 	path::PathBuf,
+	sync::Arc,
 };
 use tabled::{Table, Tabled};
 use tokio::fs::read_to_string;
@@ -97,8 +99,9 @@
 			Secret::InvokeGenerator => {
 				let config_field = &config.config_unchecked_field;
 
-				let generate_impure =
-					nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);
+				let secret =
+					nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);
+				let generate_impure = nix_go!(secret.generateImpure);
 				let on = nix_go!(generate_impure.on);
 				let call_package = nix_go!(
 					config_field.buildableSystems(Obj {
@@ -106,13 +109,62 @@
 					})[on]
 						.config
 						.nixpkgs
-						.pkgs
+						.resolvedPkgs
 						.callPackage
 				);
-				let generator = nix_go!(call_package(generate_impure.generator));
-				let built = generator.build().await?;
-				// .as_json().await?;
-				dbg!(&built);
+				let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));
+				let built = &generator.build().await?["out"];
+				let mut nix = MyCommand::new("nix");
+				let on: String = on.as_json().await?;
+				nix.arg("copy")
+					.arg("--substitute-on-destination")
+					.comparg("--to", format!("ssh-ng://{on}"))
+					.arg(built);
+				nix.run_nix().await?;
+
+				let session = config.host(&on).await?;
+
+				let owners: Vec<String> = nix_go_json!(secret.expectedOwners);
+				dbg!(&owners);
+
+				let mut recipients = String::new();
+				for owner in owners {
+					let key = config.key(&owner).await?;
+					recipients.push_str(&format!("-r \"{key}\" "));
+				}
+				recipients.push_str("-e");
+
+				// FIXME: security: created directory might be accessible to other users
+				// This shouldn't be much of a concern, as data is encrypted right after creation, yet
+				// still better to have.
+				let tempdir = session.mktemp_dir().await?;
+
+				let mut gen = session.cmd(built).await?;
+				gen.env("rageArgs", recipients).env("out", &tempdir);
+				gen.run().await?;
+
+				{
+					let marker = session.read_file_text(format!("{tempdir}/marker")).await?;
+					ensure!(marker == "SUCCESS", "generation not succeeded");
+				}
+
+				let public = session
+					.read_file_bin(format!("{tempdir}/public"))
+					.await
+					.ok();
+				let secret = session
+					.read_file_bin(format!("{tempdir}/secret"))
+					.await
+					.ok();
+				if let Some(secret) = &secret {
+					ensure!(
+						age::Decryptor::new(Cursor::new(&secret)).is_ok(),
+						"builder produced non-encrypted value as secret, this is highly insecure"
+					);
+				}
+				dbg!(&secret);
+				// // .as_json().await?;
+				// dbg!(&built);
 			}
 			Secret::ForceKeys => {
 				for host in config.list_hosts().await? {
@@ -249,7 +301,8 @@
 				if secret.secret.is_empty() {
 					bail!("no secret {name}");
 				}
-				let data = config.decrypt_on_host(&machine, secret.secret).await?;
+				let host = config.host(&machine).await?;
+				let data = host.decrypt(secret.secret).await?;
 				if plaintext {
 					let s = String::from_utf8(data).context("output is not utf8")?;
 					print!("{s}");
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,6 +1,7 @@
 use std::{
 	collections::HashMap,
 	ffi::OsStr,
+	pin,
 	process::Stdio,
 	sync::{Arc, Mutex},
 	task::Poll,
@@ -10,7 +11,7 @@
 use futures::StreamExt;
 use itertools::Either;
 use once_cell::sync::Lazy;
-use openssh::{OverSsh, Session};
+use openssh::{OverSsh, OwningCommand, Session};
 use regex::Regex;
 use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
@@ -44,6 +45,15 @@
 	ssh_session: Option<Arc<Session>>,
 }
 impl MyCommand {
+	pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {
+		assert!(!cmd.as_ref().is_empty());
+		Self {
+			command: ostoutf8(cmd),
+			args: vec![],
+			env: vec![],
+			ssh_session: Some(session),
+		}
+	}
 	pub fn new(cmd: impl AsRef<OsStr>) -> Self {
 		assert!(!cmd.as_ref().is_empty());
 		Self {
@@ -66,6 +76,29 @@
 		out.extend(self.args);
 		out
 	}
+
+	/// Translates environment variables into env command execution.
+	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).
+	///
+	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.
+	/// Figure out some way to transfer environment using stdio?
+	fn translate_env_into_env(self) -> Self {
+		if self.env.is_empty() {
+			return self;
+		}
+		let mut out = Self::new("env");
+		if let Some(session) = self.ssh_session {
+			out = out.ssh_session(session);
+		}
+		for (k, v) in self.env {
+			assert!(!k.contains('='));
+			out.arg(format!("{k}={v}"));
+		}
+		out.arg(self.command);
+		out.args(self.args);
+
+		out
+	}
 	fn into_string(self) -> String {
 		let mut out = String::new();
 		if !self.env.is_empty() {
@@ -98,7 +131,7 @@
 	}
 	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {
 		Ok(if let Some(session) = self.ssh_session.clone() {
-			let cmd = self.into_command();
+			let cmd = self.translate_env_into_env().into_command();
 			Either::Right(
 				cmd.over_ssh(session)
 					.map_err(|e| anyhow!("ssh error: {e}"))?,
@@ -126,6 +159,11 @@
 		self.arg(value);
 		self
 	}
+	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+		self.env
+			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));
+		self
+	}
 	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
 		for arg in args.into_iter() {
 			let arg = arg.as_ref();
@@ -133,9 +171,10 @@
 		}
 		self
 	}
-	pub fn sudo(self) -> Self {
+	pub fn sudo(mut self) -> Self {
 		if std::env::var_os("NO_SUDO").is_some() {
 			let mut out = Self::new("su");
+			out.ssh_session = self.ssh_session.take();
 			out.arg("-c").arg(self.into_string());
 			out
 		} else {
@@ -144,27 +183,38 @@
 			out
 		}
 	}
-	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {
+	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
+		self.ssh_session = Some(on);
+		self
+	}
+	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
 		let mut out = Self::new("ssh");
+		out.ssh_session = self.ssh_session.take();
 		out.arg(on).arg("--");
 		out.arg(self.into_string());
 		out
 	}
-	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {
-		self.ssh_session = Some(session);
-		self
-	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
-		let cmd = self.into_command();
-		run_nix_inner(str, cmd, &mut PlainHandler).await?;
+		let cmd = self.into_command_new()?;
+		match cmd {
+			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,
+			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,
+		};
 		Ok(())
 	}
 	pub async fn run_string(self) -> Result<String> {
+		let bytes = self.run_bytes().await?;
+		Ok(String::from_utf8(bytes)?)
+	}
+	pub async fn run_bytes(self) -> Result<Vec<u8>> {
 		let str = self.clone().into_string();
-		let cmd = self.into_command();
-		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;
+		let cmd = self.into_command_new()?;
+		let v = match cmd {
+			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,
+			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,
+		};
 		Ok(v)
 	}
 
@@ -172,7 +222,8 @@
 		let str = self.clone().into_string();
 		let mut cmd = self.into_command();
 		cmd.arg("--log-format").arg("internal-json");
-		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await
+		let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;
+		Ok(String::from_utf8(bytes)?)
 	}
 	pub async fn run_nix(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -198,7 +249,7 @@
 	str: String,
 	cmd: Command,
 	handler: &mut dyn Handler,
-) -> Result<String> {
+) -> Result<Vec<u8>> {
 	Ok(run_nix_inner_raw(str, cmd, true, handler, None)
 		.await?
 		.expect("has out"))
@@ -208,6 +259,24 @@
 	assert!(v.is_none());
 	Ok(())
 }
+async fn run_nix_inner_stdout_ssh(
+	str: String,
+	cmd: OwningCommand<Arc<Session>>,
+	handler: &mut dyn Handler,
+) -> Result<Vec<u8>> {
+	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)
+		.await?
+		.expect("has out"))
+}
+async fn run_nix_inner_ssh(
+	str: String,
+	cmd: OwningCommand<Arc<Session>>,
+	handler: &mut dyn Handler,
+) -> Result<()> {
+	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
+	assert!(v.is_none());
+	Ok(())
+}
 
 pub trait Handler: Send {
 	fn handle_line(&mut self, e: &str);
@@ -468,7 +537,7 @@
 	want_stdout: bool,
 	err_handler: &mut dyn Handler,
 	mut out_handler: Option<&mut dyn Handler>,
-) -> Result<Option<String>> {
+) -> Result<Option<Vec<u8>>> {
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
 	let mut child = cmd.spawn()?;
@@ -522,7 +591,71 @@
 		}
 	}
 
-	Ok(out_buf.map(String::from_utf8).transpose()?)
+	Ok(out_buf)
+}
+async fn run_nix_inner_raw_ssh(
+	str: String,
+	mut cmd: OwningCommand<Arc<Session>>,
+	want_stdout: bool,
+	err_handler: &mut dyn Handler,
+	mut out_handler: Option<&mut dyn Handler>,
+) -> Result<Option<Vec<u8>>> {
+	cmd.stderr(openssh::Stdio::piped());
+	cmd.stdout(openssh::Stdio::piped());
+	let mut child = cmd.spawn().await?;
+	let mut stderr = child.stderr().take().unwrap();
+	let stdout = child.stdout().take().unwrap();
+	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
+	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
+	let mut ob = want_stdout
+		.then(|| out.take().unwrap())
+		.unwrap_or_else(|| Box::new(EmptyAsyncRead));
+	let mut ol = (!want_stdout)
+		.then(|| out.take().unwrap())
+		.unwrap_or_else(|| Box::new(EmptyAsyncRead));
+	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
+	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
+
+	// while let Some(line) = read.next().await? {}
+
+	let mut out_buf = if want_stdout { Some(vec![]) } else { None };
+
+	let mut wait_future = pin::pin!(child.wait());
+	loop {
+		select! {
+			e = err.next() => {
+				if let Some(e) = e {
+					let e = e?;
+					err_handler.handle_line(&e);
+				}
+			},
+			o = ob.next() => {
+				if let Some(o) = o {
+					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
+				}
+			},
+			o = ol.next() => {
+				if let Some(o) = o {
+					let o = o?;
+					if let Some(out) = out_handler.as_mut() {
+						out.handle_line(&o)
+					} else {
+						err_handler.handle_line(&o)
+					}
+					// out_handler.handle_info(&o);
+				}
+			},
+			code = &mut wait_future => {
+				let code = code?;
+				if !code.success() {
+					anyhow::bail!("command '{str}' failed with status {}", code);
+				}
+				break;
+			}
+		}
+	}
+
+	Ok(out_buf)
 }
 
 pub trait ErrorRecorder: Send {
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,10 +1,10 @@
 use std::{
 	env::current_dir,
-	ffi::OsString,
+	ffi::{OsStr, OsString},
 	io::Write,
 	ops::Deref,
 	path::PathBuf,
-	sync::{Arc, Mutex, MutexGuard},
+	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
 use anyhow::{anyhow, bail, Context, Result};
@@ -46,16 +46,55 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	async fn open_session(&self) -> Result<openssh::Session> {
-		let mut session = SessionBuilder::default();
+	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		// FIXME: TOCTOU
+		if let Some(session) = &self.session.get() {
+			return Ok((*session).clone());
+		};
+		let session = SessionBuilder::default();
 
-		session
+		let session = session
 			.connect(&self.name)
 			.await
-			.map_err(|e| anyhow!("ssh error: {e}"))
+			.map_err(|e| anyhow!("ssh error: {e}"))?;
+		let session = Arc::new(session);
+		self.session.set(session.clone()).expect("TOCTOU happened");
+		Ok(session)
+	}
+	pub async fn mktemp_dir(&self) -> Result<String> {
+		let mut cmd = self.cmd("mktemp").await?;
+		cmd.arg("-d");
+		let path = cmd.run_string().await?;
+		Ok(path.trim_end().to_owned())
 	}
+	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_bytes().await
+	}
+	pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_string().await
+	}
+	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
+		let session = self.open_session().await?;
+		Ok(MyCommand::new_on(cmd, session))
+	}
+
+	pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 }
 
 impl Config {
@@ -96,12 +135,21 @@
 		command.run_string().await
 	}
 
+	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
+		Ok(ConfigHost {
+			name: name.to_owned(),
+			session: OnceLock::new(),
+		})
+	}
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
 		let fleet_field = &self.fleet_field;
 		let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
 		let mut out = vec![];
 		for name in names {
-			out.push(ConfigHost { name })
+			out.push(ConfigHost {
+				name,
+				session: OnceLock::new(),
+			})
 		}
 		Ok(out)
 	}
@@ -152,19 +200,6 @@
 		host_secrets.insert(secret, value);
 	}
 
-	pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>> {
-		let data = z85::encode(&data);
-		let mut cmd = MyCommand::new("fleet-install-secrets");
-		cmd.arg("decrypt").eqarg("--secret", data);
-		cmd = cmd.sudo().ssh(host);
-		let encoded = cmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		z85::decode(encoded).context("bad encoded data? outdated host?")
-	}
 	pub async fn reencrypt_on_host(
 		&self,
 		host: &str,
modifiednixos/meta.nixdiffbeforeafterboth
--- a/nixos/meta.nix
+++ b/nixos/meta.nix
@@ -1,11 +1,18 @@
-{ lib, ... }:
-with lib;
 {
+  lib,
+  pkgs,
+  ...
+}:
+with lib; {
   options = with types; {
+    nixpkgs.resolvedPkgs = mkOption {
+      type = types.pkgs // {description = "nixpkgs.pkgs";};
+      description = "Value of pkgs";
+    };
     tags = mkOption {
       type = listOf str;
       description = "Host tags";
-      default = [ ];
+      default = [];
     };
     network = mkOption {
       type = submodule {
@@ -13,12 +20,12 @@
           internalIps = mkOption {
             type = listOf str;
             description = "Internal ips";
-            default = [ ];
+            default = [];
           };
           externalIps = mkOption {
             type = listOf str;
             description = "External ips";
-            default = [ ];
+            default = [];
           };
         };
       };
@@ -29,7 +36,8 @@
     };
   };
   config = {
-    tags = [ "all" ];
-    network = { };
+    tags = ["all"];
+    network = {};
+    nixpkgs.resolvedPkgs = pkgs;
   };
 }