git.delta.rocks / jrsonnet / refs/commits / 2c5a4bd2d3da

difftreelog

source

cmds/fleet/src/better_nix_eval.rs24.0 KiBsourcehistory
1//! Wrapper around nix repl, which allows to work on nix code, without relying on2//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.34use std::collections::HashMap;5use std::ffi::{OsStr, OsString};6use std::fmt::{self, Display};7use std::path::PathBuf;8use std::process::Stdio;9use std::sync::{Arc, OnceLock};1011use anyhow::{anyhow, bail, ensure, Context, Result};12use better_command::{ClonableHandler, Handler, NixHandler, NoopHandler};13use futures::StreamExt;14use itertools::Itertools;15use r2d2::{Pool, PooledConnection};16use serde::de::DeserializeOwned;17use serde::{Deserialize, Serialize};18use tokio::io::AsyncWriteExt;19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};20use tokio::select;21use tokio::sync::{mpsc, oneshot, Mutex};22use tokio_util::codec::{FramedRead, LinesCodec};23use tracing::{debug, error, warn, Level};2425const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2627pub struct NixSessionInner {28	full_delimiter: String,29	nix_handler: ClonableHandler<NixHandler>,30	out: OutputHandler,31	stdin: ChildStdin,32	string_wrapping: (String, String),33	number_wrapping: (String, String),3435	executing_command: Arc<Mutex<()>>,3637	next_id: u32,38	free_list: Vec<u32>,39}40const TRAIN_STRING: &str = "\"TRAIN_STRING\"";41const TRAIN_NUMBER: &str = "13141516";4243#[must_use]44struct ErrorCollector<'i, H> {45	collected: Vec<String>,46	inner: &'i mut H,47}48impl<'i, H> ErrorCollector<'i, H> {49	fn new(inner: &'i mut H) -> Self {50		Self {51			collected: vec![],52			inner,53		}54	}55}56impl<H> ErrorCollector<'_, H> {57	fn handle_line_inner(&mut self, msg: &str) -> bool {58		let Some(msg) = msg.strip_prefix("@nix ") else {59			return false;60		};61		#[derive(Deserialize)]62		struct ErrorAction {63			action: String,64			level: u32,65			msg: String,66		}67		let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {68			return false;69		};70		if act.action != "msg" || act.level != 0 {71			return false;72		}73		self.collected.push(act.msg);74		true75	}76	fn finish(self) -> Result<()> {77		// fn dedent(s: String) -> String {78		// 	s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)79		// }80		if !self.collected.is_empty() {81			bail!(82				"{}",83				self.collected84					.iter()85					.map(|v| {86						if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {87							let v = unindent::unindent(f.trim_start());88							v.trim().to_owned()89						} else {90							v.to_owned()91						}92					})93					.join("\n")94			);95		}96		Ok(())97	}98	fn flush(self) {99		for line in self.collected {100			warn!("{line}");101		}102	}103}104impl<H: Handler> Handler for ErrorCollector<'_, H> {105	fn handle_line(&mut self, e: &str) {106		if self.handle_line_inner(e) {107			return;108		}109		self.inner.handle_line(e)110	}111}112113enum OutputLine {114	Out(String),115	Err(String),116}117struct OutputHandler {118	rx: mpsc::Receiver<OutputLine>,119	_cancel_handle: oneshot::Receiver<()>,120}121impl OutputHandler {122	fn new(out: ChildStdout, err: ChildStderr) -> Self {123		let mut out = FramedRead::new(out, LinesCodec::new());124		let mut err = FramedRead::new(err, LinesCodec::new());125		let (tx, rx) = mpsc::channel(20);126		let (mut cancelled, _cancel_handle) = oneshot::channel();127		tokio::spawn(async move {128			loop {129				select! {130					// We should receive errors earlier than synchronization131					biased;132					e = err.next() => {133						let Some(Ok(e)) = e else {134							if e.is_some() {135								error!("bad repl stderr: {e:?}");136							}137							continue;138						};139						let _ = tx.send(OutputLine::Err(e)).await;140					}141					o = out.next() => {142						let Some(Ok(o)) = o else {143							if o.is_some() {144								error!("bad repl stdout: {o:?}");145							}146							continue;147						};148						let _ = tx.send(OutputLine::Out(o)).await;149					}150					// Reader doesn't care about stdout, as this is cancelled.151					// Error still might be useful, to process leftover span closures?152					_ = cancelled.closed() => {153						break;154					}155				}156			}157		});158		Self { rx, _cancel_handle }159	}160	async fn next(&mut self) -> Option<OutputLine> {161		self.rx.recv().await162	}163}164165struct WarnHandler;166impl Handler for WarnHandler {167	fn handle_line(&mut self, e: &str) {168		warn!(target: "nix", "{e}")169	}170}171172impl NixSessionInner {173	async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {174		let mut cmd = Command::new("nix");175		cmd.arg("repl")176			.arg(flake)177			.arg("--log-format")178			.arg("internal-json");179		for arg in extra_args {180			cmd.arg(arg);181		}182		cmd.stdin(Stdio::piped());183		cmd.stdout(Stdio::piped());184		cmd.stderr(Stdio::piped());185		let cmd = cmd.spawn()?;186		let stdout = cmd.stdout.unwrap();187		let stderr = cmd.stderr.unwrap();188		let mut out = OutputHandler::new(stdout, stderr);189		let mut stdin = cmd.stdin.unwrap();190		// Standard repl hello doesn't work with internal-json logger191		stdin.write_all(REPL_DELIMITER.as_bytes()).await?;192		stdin.write_all(b"\n").await?;193		stdin.flush().await?;194		let nix_handler = NixHandler::default();195		let mut full_delimiter = None;196		let mut errors = vec![];197		while let Some(line) = out.next().await {198			let line = match line {199				OutputLine::Out(o) => o,200				OutputLine::Err(_e) => {201					// Handle startup errors, but skip repl hello?202					errors.push(_e);203					continue;204				}205			};206			if line.contains(REPL_DELIMITER) {207				debug!("discovered repl delimiter with added colors: {line}");208				full_delimiter = Some(line.to_owned());209				break;210			}211		}212		let Some(full_delimiter) = full_delimiter else {213			for e in errors {214				error!("{e}");215			}216			bail!("failed to discover delimiter");217		};218		let mut res = Self {219			full_delimiter,220			nix_handler: ClonableHandler::new(nix_handler),221			out,222			stdin,223			string_wrapping: Default::default(),224			number_wrapping: Default::default(),225226			executing_command: Arc::new(Mutex::new(())),227228			next_id: 0,229			free_list: vec![],230		};231		res.train().await?;232		Ok(res)233	}234	async fn train(&mut self) -> Result<()> {235		{236			let full_string = self237				.execute_expression_raw(TRAIN_STRING, &mut NoopHandler)238				.await?;239			let string_offset = full_string.find(TRAIN_STRING).expect("contained");240			let string_prefix = &full_string[..string_offset];241			let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];242			self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());243		}244		{245			let full_number = self246				.execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)247				.await?;248			let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");249			let number_prefix = &full_number[..number_offset];250			let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];251			self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());252		}253		Ok(())254	}255	async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {256		if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {257			let cmd_str = String::from_utf8_lossy(cmd.as_ref());258			tracing::debug!("{cmd_str}");259		};260		self.stdin.write_all(cmd.as_ref()).await?;261		self.stdin.write_all(b"\n").await?;262		Ok(())263	}264	async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {265		let mut out = String::new();266		while let Some(line) = self.out.next().await {267			let line = match line {268				OutputLine::Out(out) => out,269				OutputLine::Err(err) => {270					err_handler.handle_line(&err);271					continue;272				}273			};274			if line == self.full_delimiter {275				return Ok(out);276			}277			if !out.is_empty() {278				out.push('\n');279			}280			out.push_str(&line);281		}282		bail!("didn't reached delimiter");283	}284	async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {285		let num = self.number_wrapping.clone();286		let n = self.execute_expression_wrapping(expr, &num).await?;287		Ok(n.parse::<u64>()?)288	}289	async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {290		let num = self.string_wrapping.clone();291		let n = self.execute_expression_wrapping(expr, &num).await?;292		let str: String = serde_json::from_str(&n)?;293		Ok(str)294	}295	async fn execute_expression_to_json<V: DeserializeOwned>(296		&mut self,297		expr: impl AsRef<[u8]>,298	) -> Result<V> {299		let mut fexpr = b"builtins.toJSON (".to_vec();300		fexpr.extend_from_slice(expr.as_ref());301		fexpr.push(b')');302		let v = self303			.execute_expression_string(fexpr)304			.await305			.context("string expression")?;306		serde_json::from_str(&v).context("json parse")307	}308	async fn execute_expression_wrapping(309		&mut self,310		expr: impl AsRef<[u8]>,311		wrapping: &(String, String),312	) -> Result<String> {313		let mut nix_handler = self.nix_handler.clone();314		let mut collected = ErrorCollector::new(&mut nix_handler);315		let res = self.execute_expression_raw(expr, &mut collected).await?;316		if res.is_empty() {317			collected.finish()?;318			bail!("expected expression, got nothing")319		} else {320			collected.flush()321		};322		let Some(res) = res.strip_prefix(&wrapping.0) else {323			bail!("invalid type")324		};325		let Some(res) = res.strip_suffix(&wrapping.1) else {326			bail!("invalid type")327		};328		Ok(res.to_owned())329	}330	async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {331		let mut nix_handler = self.nix_handler.clone();332		let mut collected = ErrorCollector::new(&mut nix_handler);333		let v = self.execute_expression_raw(expr, &mut collected).await?;334		collected.finish()?;335		ensure!(v.is_empty(), "unexpected expression result");336		Ok(())337	}338	async fn execute_expression_raw(339		&mut self,340		expr: impl AsRef<[u8]>,341		err_handler: &mut dyn Handler,342	) -> Result<String> {343		// Prevent two commands from being executed in parallel, messing with each other.344		let _lock = self.executing_command.clone();345		let _guard = _lock.lock().await;346347		self.send_command(expr).await?;348		// It will be echoed349		self.send_command(REPL_DELIMITER).await?;350		self.read_until_delimiter(err_handler).await351	}352	async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {353		let id = self.allocate_id();354		self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))355			.await?;356		Ok(id)357	}358359	/// Id should be immediately used360	fn allocate_id(&mut self) -> u32 {361		if let Some(free) = self.free_list.pop() {362			free363		} else {364			let v = self.next_id;365			self.next_id += 1;366			v367		}368	}369	// Nix has no way to deallocate variable, yet GC will correct everything not reachable.370	// async fn free_id(&mut self, id: u32) -> Result<()> {371	// 	self.execute_expression_empty(format!("sess_field_{id} = null"))372	// 		.await?;373	// 	self.free_list.push(id);374	// 	Ok(())375	// }376}377378#[derive(Clone)]379pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);380381#[derive(Clone)]382pub struct NixExprBuilder {383	out: String,384	used_fields: Vec<Field>,385}386impl NixExprBuilder {387	pub fn object() -> Self {388		NixExprBuilder {389			out: "{ ".to_owned(),390			used_fields: Vec::new(),391		}392	}393	pub fn string(s: &str) -> Self {394		NixExprBuilder {395			out: nixlike::serialize(s)396				.expect("no problems with serializing_string")397				.trim_end()398				.to_owned(),399			used_fields: Vec::new(),400		}401	}402	pub fn serialized(v: impl Serialize) -> Self {403		let serialized = nixlike::serialize(v).expect("invalid value for apply");404		Self {405			out: serialized.trim_end().to_owned(),406			used_fields: Vec::new(),407		}408	}409	pub fn field(f: Field) -> Self {410		Self {411			out: format!("sess_field_{}", f.0.value.expect("no value")),412			used_fields: vec![f],413		}414	}415	pub fn end_obj(&mut self) {416		self.out.push('}');417	}418	pub fn obj_key(&mut self, name: Self, value: Self) {419		self.out.push_str(r#""${"#);420		self.extend(name);421		self.out.push_str(r#"}" = "#);422		self.extend(value);423		self.out.push_str("; ");424	}425426	pub fn extend(&mut self, e: Self) {427		self.out.push_str(&e.out);428		self.used_fields.extend(e.used_fields);429	}430431	#[allow(dead_code)]432	pub fn session(&self) -> NixSession {433		let mut session = None;434		for ele in &self.used_fields {435			if session.is_none() {436				session = Some(ele.0.session.clone());437				continue;438			}439			let session = &session.as_ref().expect("checked").0;440			let ele_sess = &ele.0.session.0;441			assert!(442				Arc::ptr_eq(session, ele_sess),443				"can't mix fields from different session"444			);445		}446		session.expect("expr without fields used")447	}448	#[allow(dead_code)]449	pub fn index_attr(&mut self, s: &str) {450		let escaped = nixlike::serialize(s).expect("string");451		self.out.push('.');452		self.out.push_str(escaped.trim_end());453	}454}455456#[macro_export]457macro_rules! nix_expr_inner {458	//(@munch_object FIXME: value should be arbitrary nix_expr_inner input... Time to write proc-macro?459	(@obj($o:ident) $field:ident, $($tt:tt)*) => {{460		$o.obj_key(461			NixExprBuilder::string(stringify!($field)),462			NixExprBuilder::field($field),463		);464		nix_expr_inner!(@obj($o) $($tt)*);465	}};466	(@obj($o:ident) $field:ident: $v:block, $($tt:tt)*) => {{467		$o.obj_key(468			NixExprBuilder::string(stringify!($field)),469			NixExprBuilder::serialized(&$v),470		);471		nix_expr_inner!(@obj($o) $($tt)*);472	}};473	(@obj($o:ident)) => {{}};474	(Obj { $($tt:tt)* }) => {{475		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};476		let mut out = NixExprBuilder::object();477		nix_expr_inner!(@obj(out) $($tt)*);478		out.end_obj();479		out480	}};481	(@field($o:ident) . $var:ident $($tt:tt)*) => {{482		$o.index_attr(stringify!($var));483		nix_expr_inner!(@field($o) $($tt)*);484	}};485	(@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{486		$o.push(Index::attr(&$v));487		nix_expr_inner!(@o($o) $($tt)*);488	}};489	(@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{490		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));491		nix_expr_inner!(@o($o) $($tt)*);492	}};493	(@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {494		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));495		nix_expr_inner!(@o($o) $($tt)*);496	};497	(@field($o:ident)) => {};498	($field:ident $($tt:tt)*) => {{499		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};500		#[allow(unused_mut, reason = "might be used if indexed")]501		let mut out = NixExprBuilder::field($field.clone());502		nix_expr_inner!(@field(out) $($tt)*);503		out504	}};505	($v:literal) => {{506		use $crate::better_nix_eval::NixExprBuilder;507		NixExprBuilder::string($v)508	}};509	({$v:expr}) => {{510		use $crate::better_nix_eval::NixExprBuilder;511		NixExprBuilder::serialized(&$v)512	}}513}514#[macro_export]515macro_rules! nix_expr {516	($($tt:tt)+) => {{517		use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};518		let expr = nix_expr_inner!($($tt)+);519		Field::new(expr.session(), expr.out)520	}};521}522523#[macro_export]524macro_rules! nix_go {525	(@o($o:ident) . $var:ident $($tt:tt)*) => {{526		$o.push(Index::attr(stringify!($var)));527		nix_go!(@o($o) $($tt)*);528	}};529	(@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{530		$o.push(Index::attr(&$v));531		nix_go!(@o($o) $($tt)*);532	}};533	(@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{534		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));535		nix_go!(@o($o) $($tt)*);536	}};537	(@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {538		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));539		nix_go!(@o($o) $($tt)*);540	};541	(@o($o:ident) | $($var:tt)*) => {542		$o.push(Index::Pipe($crate::nix_expr_inner!($($var)+)));543	};544	(@o($o:ident)) => {};545	($field:ident $($tt:tt)+) => {{546		use $crate::{nix_go, better_nix_eval::Index};547		let field = $field.clone();548		let mut out = vec![];549		nix_go!(@o(out) $($tt)*);550		field.select(out).await?551	}}552}553#[macro_export]554macro_rules! nix_go_json {555	($($tt:tt)*) => {{556		$crate::nix_go!($($tt)*).as_json().await?557	}};558}559560#[derive(Clone)]561pub enum Index {562	Var(String),563	String(String),564	#[allow(dead_code)]565	Apply(String),566	#[allow(dead_code)]567	Expr(NixExprBuilder),568	ExprApply(NixExprBuilder),569	Pipe(NixExprBuilder),570}571impl Index {572	pub fn var(v: impl AsRef<str>) -> Self {573		let v = v.as_ref();574		assert!(575			!(v.contains('.') | v.contains(' ')),576			"bad variable name: {v}"577		);578		Self::Var(v.to_owned())579	}580	pub fn attr(v: impl AsRef<str>) -> Self {581		Self::String(v.as_ref().to_owned())582	}583	#[allow(dead_code)]584	pub fn apply(v: impl Serialize) -> Self {585		let serialized = nixlike::serialize(v).expect("invalid value for apply");586		Self::Apply(serialized.trim_end().to_owned())587	}588}589impl Display for Index {590	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {591		match self {592			Index::Var(v) => {593				write!(f, "{v}")594			}595			Index::String(k) => {596				let v = nixlike::format_identifier(k.as_str());597				write!(f, ".{v}")598			}599			Index::Apply(o) => {600				write!(f, "<apply>({o})")601			}602			Index::Expr(e) => {603				write!(f, "[{}]", e.out)604			}605			Index::ExprApply(e) => {606				write!(f, "<apply>({})", e.out)607			}608			Index::Pipe(e) => {609				write!(f, "<map>({})", e.out)610			}611		}612	}613}614impl fmt::Debug for Index {615	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {616		write!(f, "{self}")617	}618}619struct PathDisplay<'i>(&'i [Index]);620impl Display for PathDisplay<'_> {621	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {622		for i in self.0 {623			write!(f, "{i}")?;624		}625		Ok(())626	}627}628struct FieldInner {629	full_path: Option<Vec<Index>>,630	session: NixSession,631	value: Option<u32>,632}633fn context(op: &str, full_path: Option<&[Index]>, query: &str) -> String {634	if let Some(full_path) = &full_path {635		format!("on {op}, full path: {}", PathDisplay(full_path))636	} else {637		format!("query: {query:?}")638	}639}640#[derive(Clone)]641pub struct Field(Arc<FieldInner>);642impl Field {643	fn root(session: NixSession) -> Self {644		Self(Arc::new(FieldInner {645			full_path: Some(vec![]),646			session,647			value: None,648		}))649	}650	async fn new(session: NixSession, query: &str) -> Result<Self> {651		let vid = session652			.0653			.lock()654			.await655			.execute_assign(query)656			.await657			.with_context(|| context("new root", None, query))?;658		Ok(Self(Arc::new(FieldInner {659			full_path: None,660			session,661			value: Some(vid),662		})))663	}664	pub async fn field(session: NixSession, field: &str) -> Result<Self> {665		Self::root(session).select([Index::var(field)]).await666	}667	pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {668		let mut used_fields = Vec::new();669		let mut name = name.into_iter();670671		let mut full_path = self.0.full_path.clone();672		let mut query = if let Some(id) = self.0.value {673			format!("sess_field_{id}")674		} else {675			let first = name.next();676			if let Some(Index::Var(i)) = first {677				if let Some(full_path) = &mut full_path {678					full_path.push(Index::Var(i.clone()));679				}680				i.clone()681			} else {682				panic!("first path item should be variable, got {first:?}")683			}684		};685		for v in name {686			if let Some(full_path) = &mut full_path {687				full_path.push(v.clone());688			}689			match v {690				Index::Var(_) => panic!("var item may only be first"),691				Index::String(s) => {692					let escaped = nixlike::serialize(s)?;693					query.push('.');694					query.push_str(escaped.trim());695				}696				Index::Apply(a) => {697					// In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`698					query = format!("({query} {a})");699				}700				Index::Expr(e) => {701					let index = Field::new(self.0.session.clone(), &e.out).await?;702					used_fields.push(index.clone());703					query.push('.');704					let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));705					query.push_str(&index);706				}707				Index::ExprApply(e) => {708					let index = Field::new(self.0.session.clone(), &e.out).await?;709					used_fields.push(index.clone());710					query.push(' ');711					let index = format!("sess_field_{}", index.0.value.expect("value"));712					query.push_str(&index);713					query = format!("({query})");714				}715				Index::Pipe(v) => {716					let index = Field::new(self.0.session.clone(), &v.out).await?;717					used_fields.push(index.clone());718					let index = format!("sess_field_{}", index.0.value.expect("value"));719					query = format!("({index} {query})");720				}721			}722		}723724		let vid = self725			.0726			.session727			.0728			.lock()729			.await730			.execute_assign(&query)731			.await732			.with_context(|| {733				if let Some(full_path) = &full_path {734					format!("full path: {}", PathDisplay(full_path))735				} else {736					format!("query: {query:?}")737				}738			})?;739		Ok(Self(Arc::new(FieldInner {740			full_path,741			session: self.0.session.clone(),742			value: Some(vid),743		})))744	}745	pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {746		let id = self.0.value.expect("can't serialize root field");747		let query = format!("sess_field_{id}");748		self.0749			.session750			.0751			.lock()752			.await753			.execute_expression_to_json(&query)754			.await755			.with_context(|| context("as_json", self.0.full_path.as_deref(), &query))756	}757	#[allow(dead_code)]758	pub async fn has_field(&self, name: &str) -> Result<bool> {759		let id = self.0.value.expect("can't list root fields");760		let key = nixlike::escape_string(name);761		let query = format!("sess_field_{id} ? {key}");762		self.0763			.session764			.0765			.lock()766			.await767			.execute_expression_to_json(&query)768			.await769			.with_context(|| context("has_field", self.0.full_path.as_deref(), &query))770	}771	pub async fn list_fields(&self) -> Result<Vec<String>> {772		let id = self.0.value.expect("can't list root fields");773		let query = format!("builtins.attrNames sess_field_{id}");774		self.0775			.session776			.0777			.lock()778			.await779			.execute_expression_to_json(&query)780			.await781			.with_context(|| context("list field", self.0.full_path.as_deref(), &query))782	}783	pub async fn type_of(&self) -> Result<String> {784		let id = self.0.value.expect("can't list root fields");785		let query = format!("builtins.typeOf sess_field_{id}");786		self.0787			.session788			.0789			.lock()790			.await791			.execute_expression_to_json(&query)792			.await793			.with_context(|| context("type_of", self.0.full_path.as_deref(), &query))794	}795	#[allow(dead_code)]796	pub async fn import(&self) -> Result<Self> {797		let import = Self::new(self.0.session.clone(), "import").await?;798		Ok(nix_go!(self | import))799	}800	pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {801		let id = self.0.value.expect("can't use build on not-value");802		let query = format!(":b sess_field_{id}");803		let vid = self804			.0805			.session806			.0807			.lock()808			.await809			.execute_expression_raw(&query, &mut NixHandler::default())810			.await?;811		ensure!(812			!vid.is_empty(),813			"build failed: {}",814			context("build", self.0.full_path.as_deref(), &query),815		);816		let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")817		else {818			panic!("unexpected build output: {vid:?}");819		};820		let outputs = vid821			.split('\n')822			.filter(|v| !v.is_empty())823			.map(|v| v.split_once(" -> ").expect("unexpected build output"))824			.map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))825			.collect();826		Ok(outputs)827	}828}829impl Drop for FieldInner {830	fn drop(&mut self) {831		if let Some(id) = self.value {832			if let Ok(mut lock) = self.session.0.try_lock() {833				lock.free_list.push(id)834			}835			// Leaked836		}837	}838}839struct NixSessionPoolInner {840	flake: OsString,841	nix_args: Vec<OsString>,842}843844#[derive(Debug)]845pub struct NixPoolError(anyhow::Error);846impl From<anyhow::Error> for NixPoolError {847	fn from(value: anyhow::Error) -> Self {848		Self(value)849	}850}851impl std::error::Error for NixPoolError {}852impl std::fmt::Display for NixPoolError {853	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {854		self.0.fmt(f)855	}856}857impl r2d2::ManageConnection for NixSessionPoolInner {858	type Connection = NixSessionInner;859	type Error = NixPoolError;860	fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {861		let _v = TOKIO_RUNTIME862			.get()863			.expect("missed tokio runtime init!")864			.enter();865		Ok(futures::executor::block_on(NixSessionInner::new(866			self.flake.as_os_str(),867			self.nix_args.iter().map(OsString::as_os_str),868		))?)869	}870871	fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {872		let _v = TOKIO_RUNTIME873			.get()874			.expect("missed tokio runtime init!")875			.enter();876		let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;877		if res != 4 {878			return Err(anyhow!("sanity check failed").into());879		};880		Ok(())881	}882883	fn has_broken(&self, _conn: &mut Self::Connection) -> bool {884		false885	}886}887pub struct NixSessionPool(Pool<NixSessionPoolInner>);888impl NixSessionPool {889	pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {890		let inner = tokio::task::block_in_place(|| {891			r2d2::Builder::<NixSessionPoolInner>::new()892				.min_idle(Some(0))893				.build(NixSessionPoolInner { flake, nix_args })894		})?;895		Ok(Self(inner))896	}897	pub async fn get(&self) -> Result<NixSession> {898		let v = tokio::task::block_in_place(|| self.0.get())?;899		Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))900	}901}902903pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();