git.delta.rocks / jrsonnet / refs/commits / 353ae3be2d27

difftreelog

fix handle nix repl errors without extra synchronization

Yaroslav Bolyukin2023-10-29parent: #5250aa0.patch.diff
in: trunk

1 file changed

modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
before · cmds/fleet/src/better_nix_eval.rs
1use std::ffi::{OsStr, OsString};2use std::process::Stdio;3use std::sync::{Arc, Mutex, OnceLock};45use abort_on_drop::ChildTask;6use anyhow::{anyhow, bail, ensure, Context, Result};7use futures::StreamExt;8use r2d2::{Pool, PooledConnection};9use serde::de::DeserializeOwned;10use serde::Deserialize;11use tokio::io::AsyncWriteExt;12use tokio::process::{ChildStdin, ChildStdout, Command};13use tokio::sync::oneshot;14use tokio_util::codec::{FramedRead, LinesCodec};15use tracing::debug;1617use crate::command::{process_child_stderr, ErrorRecorder, ErrorRecorderT, NixHandler};1819const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";20// To synchronize stderr and stdout. It works, yet I hate this.21// There is no other way to catch errors, because they are coming from different streams, and they are not synchronized in tokio.22const ERROR_DELIMITER: &str = "FLEET_MAGIC_ERROR_DELIMITER";2324pub struct NixSessionInner {25	full_delimiter: String,26	#[allow(dead_code)]27	stderr_handler: ChildTask<Result<()>>,28	error_recorder: ErrorRecorderT,29	read: FramedRead<ChildStdout, LinesCodec>,30	stdin: ChildStdin,31	string_wrapping: (String, String),32	number_wrapping: (String, String),33	error_delimiter: String,3435	next_id: u32,36	free_list: Vec<u32>,37}38const TRAIN_STRING: &str = "\"TRAIN_STRING\"";39const TRAIN_NUMBER: &str = "13141516";4041struct ErrorRecorderHandle {42	handle: ErrorRecorderT,43}44impl ErrorRecorderHandle {}45impl Drop for ErrorRecorderHandle {46	fn drop(&mut self) {47		let mut recorded = self.handle.lock().unwrap();48		assert!(recorded.is_some(), "exclusive");49		*recorded = None;50	}51}5253struct ErrorCollector {54	collected: Arc<Mutex<Vec<String>>>,55	delim: String,56	got_delim: Option<oneshot::Sender<()>>,57}58impl ErrorRecorder for ErrorCollector {59	fn push_message(&mut self, msg: &str) -> bool {60		if msg == self.delim {61			let _ = self.got_delim62				.take()63				.expect("error delim is only expected once")64				.send(());65			 return true;66		}67		let Some(msg) = msg.strip_prefix("@nix ") else {68			return false;69		};70		#[derive(Deserialize)]71		struct ErrorAction {72			action: String,73			level: u32,74			msg: String,75		}76		let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {77			return false;78		};79		if act.action != "msg" || act.level != 0 {80			return false;81		}82		self.collected.lock().unwrap().push(act.msg);83		true84	}85}8687impl NixSessionInner {88	async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {89		let mut cmd = Command::new("nix");90		cmd.arg("repl")91			.arg(flake)92			.arg("--log-format")93			.arg("internal-json");94		for arg in extra_args {95			cmd.arg(arg);96		}97		cmd.stdin(Stdio::piped());98		cmd.stdout(Stdio::piped());99		cmd.stderr(Stdio::piped());100		let cmd = cmd.spawn()?;101		let stdout = cmd.stdout.unwrap();102		let stderr = cmd.stderr.unwrap();103		let mut stdin = cmd.stdin.unwrap();104		let error_recorder = ErrorRecorderT::default();105		let err_recorder = error_recorder.clone();106		let stderr_handler = abort_on_drop::ChildTask::from(tokio::spawn(async move {107			let mut handler = NixHandler::default();108			process_child_stderr(stderr, &mut handler, err_recorder).await109		}));110		// Standard repl hello doesn't work with internal-json logger111		stdin.write_all(REPL_DELIMITER.as_bytes()).await?;112		stdin.write_all(b"\n").await?;113		stdin.flush().await?;114		let mut read = FramedRead::new(stdout, LinesCodec::new());115		let mut full_delimiter = None;116		while let Some(line) = read.next().await {117			let line = line?;118			if line.contains(REPL_DELIMITER) {119				debug!("discovered repl delimiter with added colors: {line}");120				full_delimiter = Some(line.to_owned());121				break;122			}123		}124		let Some(full_delimiter) = full_delimiter else {125			bail!("failed to discover delimiter");126		};127		let mut res = Self {128			full_delimiter,129			error_delimiter: "[[filled after training]]".to_owned(),130			stderr_handler,131			error_recorder,132			read,133			stdin,134			string_wrapping: Default::default(),135			number_wrapping: Default::default(),136137			next_id: 0,138			free_list: vec![],139		};140		res.train().await?;141		Ok(res)142	}143	async fn train(&mut self) -> Result<()> {144		{145			let full_string = self.execute_expression_raw(TRAIN_STRING).await?;146			let string_offset = full_string.find(TRAIN_STRING).expect("contained");147			let string_prefix = &full_string[..string_offset];148			let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];149			self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());150		}151		{152			let full_number = self.execute_expression_raw(TRAIN_NUMBER).await?;153			let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");154			let number_prefix = &full_number[..number_offset];155			let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];156			self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());157		}158		{159			struct TrainingErrorCollector(Option<oneshot::Sender<String>>);160			impl ErrorRecorder for TrainingErrorCollector {161				fn push_message(&mut self, msg: &str) -> bool {162					if msg.contains(ERROR_DELIMITER) {163						let _ = self164							.0165							.take()166							.expect("error delimiter is sent once")167							.send(msg.to_owned());168					}169					true170				}171			}172			let (tx, rx) = oneshot::channel();173			let _handle = self.record_error(TrainingErrorCollector(Some(tx)));174			self.send_command(ERROR_DELIMITER).await?;175			self.send_command(REPL_DELIMITER).await?;176			self.read_until_delimiter().await?;177			let msg = rx.await?;178			self.error_delimiter = msg;179		}180		Ok(())181	}182	fn record_error(&mut self, v: impl ErrorRecorder + 'static) -> ErrorRecorderHandle {183		{184			let mut recorder = self.error_recorder.lock().unwrap();185			assert!(recorder.is_none(), "recorder is already started");186			*recorder = Some(Box::new(v));187		}188		ErrorRecorderHandle {189			handle: self.error_recorder.clone(),190		}191	}192	async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {193		self.stdin.write_all(cmd.as_ref()).await?;194		self.stdin.write_all(b"\n").await?;195		Ok(())196	}197	async fn read_until_delimiter(&mut self) -> Result<String> {198		let mut out = String::new();199		while let Some(line) = self.read.next().await {200			let line = line?;201			if line == self.full_delimiter {202				return Ok(out);203			}204			if !out.is_empty() {205				out.push('\n');206			}207			out.push_str(&line);208		}209		bail!("didn't reached delimiter");210	}211	async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {212		let num = self.number_wrapping.clone();213		let n = self.execute_expression_wrapping(expr, &num).await?;214		Ok(n.parse::<u64>()?)215	}216	async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {217		let num = self.string_wrapping.clone();218		let n = self.execute_expression_wrapping(expr, &num).await?;219		let str: String = serde_json::from_str(&n)?;220		Ok(str)221	}222	async fn execute_expression_to_json<V: DeserializeOwned>(223		&mut self,224		expr: impl AsRef<[u8]>,225	) -> Result<V> {226		let mut fexpr = b"builtins.toJSON (".to_vec();227		fexpr.extend_from_slice(expr.as_ref());228		fexpr.push(b')');229		let v = self.execute_expression_string(fexpr).await?;230		Ok(serde_json::from_str(&v)?)231	}232	async fn execute_expression_wrapping(233		&mut self,234		expr: impl AsRef<[u8]>,235		wrapping: &(String, String),236	) -> Result<String> {237		let collected = Arc::new(Mutex::new(vec![]));238		let (etx, erx) = oneshot::channel();239		let _collector = self.record_error(ErrorCollector{collected:collected.clone(), delim: self.error_delimiter.clone(), got_delim: Some(etx)});240		let res = self.execute_expression_raw(expr).await?;241		let _ = self.execute_expression_raw(ERROR_DELIMITER).await?;242		let _ = erx.await;243		if res.is_empty() {244			let c = collected.lock().unwrap();245			if c.is_empty() {246				bail!("expected expression, got nothing")247			}248			bail!("{}", c.join("\n"));249		}250		drop(_collector);251		let Some(res) = res.strip_prefix(&wrapping.0) else {252			bail!("invalid type")253		};254		let Some(res) = res.strip_suffix(&wrapping.1) else {255			bail!("invalid type")256		};257		Ok(res.to_owned())258	}259	async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {260		let collected = Arc::new(Mutex::new(vec![]));261		let (etx, erx) = oneshot::channel();262		let _collector = self.record_error(ErrorCollector{collected:collected.clone(), delim: self.error_delimiter.clone(), got_delim: Some(etx)});263		let v = self.execute_expression_raw(expr).await?;264		let _ = self.execute_expression_raw(ERROR_DELIMITER).await;265		let _ = erx.await;266267		let c = collected.lock().unwrap();268		if !c.is_empty() {269			bail!("{}", c.join("\n"));270		}271		ensure!(v.is_empty(), "unexpected expression result");272		Ok(())273	}274	async fn execute_expression_raw(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {275		self.send_command(expr).await?;276		// It will be echoed277		self.send_command(REPL_DELIMITER).await?;278		self.read_until_delimiter().await279	}280	async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {281		let id = self.allocate_id();282		self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))283			.await?;284		Ok(id)285	}286287	/// Id should be immediately used288	fn allocate_id(&mut self) -> u32 {289		if let Some(free) = self.free_list.pop() {290			free291		} else {292			let v = self.next_id;293			self.next_id += 1;294			v295		}296	}297	// Nix has no way to deallocate variable, yet GC will correct everything not reachable.298	// async fn free_id(&mut self, id: u32) -> Result<()> {299	// 	self.execute_expression_empty(format!("sess_field_{id} = null"))300	// 		.await?;301	// 	self.free_list.push(id);302	// 	Ok(())303	// }304}305306#[derive(Clone)]307pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);308309#[derive(Clone, Debug)]310enum Index {311	String(String),312	// Idx(u32),313}314pub struct Field {315	full_path: Vec<Index>,316	session: NixSession,317	value: Option<u32>,318}319impl Field {320	fn root(session: NixSession) -> Self {321		Self {322			full_path: vec![],323			session,324			value: None,325		}326	}327	pub async fn field(session: NixSession, field: &str) -> Result<Self> {328		Self::root(session).get_field_deep([field]).await329	}330	pub async fn get_field(&self, name: &str) -> Result<Self> {331		self.get_field_deep([name]).await332	}333	pub async fn get_field_deep<'a>(334		&self,335		name: impl IntoIterator<Item = &'a str>,336	) -> Result<Self> {337		let mut iter = name.into_iter();338339		let mut full_path = self.full_path.clone();340		let mut query = if let Some(id) = self.value {341			format!("sess_field_{id}")342		} else {343			let first = iter.next().expect("name not empty");344			ensure!(345				!(first.contains('.') | first.contains(' ')),346				"bad name for root query: {first}"347			);348			full_path.push(Index::String(first.to_string()));349			first.to_string()350		};351		for v in iter {352			full_path.push(Index::String(v.to_string()));353			// Escape354			let escaped = nixlike::serialize(v)?;355			let escaped = escaped.trim();356			query.push('.');357			query.push_str(escaped);358		}359360		let vid = self361			.session362			.0363			.lock()364			.await365			.execute_assign(&query)366			.await367			.with_context(|| format!("full path: {:?}", full_path))?;368		Ok(Self {369			full_path,370			session: self.session.clone(),371			value: Some(vid),372		})373	}374	pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {375		let id = self.value.expect("can't serialize root field");376		self.session377			.0378			.lock()379			.await380			.execute_expression_to_json(&format!("sess_field_{id}"))381			.await382			.with_context(|| format!("full path: {:?}", self.full_path))383	}384	pub async fn list_fields(&self) -> Result<Vec<String>> {385		let id = self.value.expect("can't list root fields");386		self.session387			.0388			.lock()389			.await390			.execute_expression_to_json(&format!("builtins.attrNames sess_field_{id}"))391			.await392			.with_context(|| format!("full path: {:?}", self.full_path))393	}394}395impl Drop for Field {396	fn drop(&mut self) {397		if let Some(id) = self.value {398			if let Ok(mut lock) = self.session.0.try_lock() {399				lock.free_list.push(id)400			}401			// Leaked402		}403	}404}405struct NixSessionPoolInner {406	flake: OsString,407	nix_args: Vec<OsString>,408}409410#[derive(Debug)]411pub struct NixPoolError(anyhow::Error);412impl From<anyhow::Error> for NixPoolError {413	fn from(value: anyhow::Error) -> Self {414		Self(value)415	}416}417impl std::error::Error for NixPoolError {}418impl std::fmt::Display for NixPoolError {419	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {420		self.0.fmt(f)421	}422}423impl r2d2::ManageConnection for NixSessionPoolInner {424	type Connection = NixSessionInner;425	type Error = NixPoolError;426	fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {427		let _v = TOKIO_RUNTIME428			.get()429			.expect("missed tokio runtime init!")430			.enter();431		Ok(futures::executor::block_on(NixSessionInner::new(432			self.flake.as_os_str(),433			self.nix_args.iter().map(OsString::as_os_str),434		))?)435	}436437	fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {438		let _v = TOKIO_RUNTIME439			.get()440			.expect("missed tokio runtime init!")441			.enter();442		let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;443		if res != 4 {444			return Err(anyhow!("sanity check failed").into());445		};446		Ok(())447	}448449	fn has_broken(&self, _conn: &mut Self::Connection) -> bool {450		false451	}452}453pub struct NixSessionPool(Pool<NixSessionPoolInner>);454impl NixSessionPool {455	pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {456		let inner = tokio::task::block_in_place(|| {457			r2d2::Builder::<NixSessionPoolInner>::new()458				.min_idle(Some(0))459				.build(NixSessionPoolInner { flake, nix_args })460		})?;461		Ok(Self(inner))462	}463	pub async fn get(&self) -> Result<NixSession> {464		let v = tokio::task::block_in_place(|| self.0.get())?;465		Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))466	}467}468469pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();
after · cmds/fleet/src/better_nix_eval.rs
1use std::ffi::{OsStr, OsString};2use std::fmt::Display;3use std::process::Stdio;4use std::sync::{Arc, OnceLock};56use anyhow::{anyhow, bail, ensure, Context, Result};7use futures::StreamExt;8use itertools::Itertools;9use r2d2::{Pool, PooledConnection};10use serde::de::DeserializeOwned;11use serde::Deserialize;12use tokio::io::AsyncWriteExt;13use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};14use tokio::select;15use tokio::sync::{mpsc, oneshot};16use tokio_util::codec::{FramedRead, LinesCodec};17use tracing::{debug, error, warn};1819use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};2021const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2223pub struct NixSessionInner {24	full_delimiter: String,25	nix_handler: ClonableHandler<NixHandler>,26	out: OutputHandler,27	stdin: ChildStdin,28	string_wrapping: (String, String),29	number_wrapping: (String, String),3031	next_id: u32,32	free_list: Vec<u32>,33}34const TRAIN_STRING: &str = "\"TRAIN_STRING\"";35const TRAIN_NUMBER: &str = "13141516";3637#[must_use]38struct ErrorCollector<'i, H> {39	collected: Vec<String>,40	inner: &'i mut H,41}42impl<'i, H> ErrorCollector<'i, H> {43	fn new(inner: &'i mut H) -> Self {44		Self {45			collected: vec![],46			inner,47		}48	}49}50impl<H> ErrorCollector<'_, H> {51	fn handle_line_inner(&mut self, msg: &str) -> bool {52		let Some(msg) = msg.strip_prefix("@nix ") else {53			return false;54		};55		#[derive(Deserialize)]56		struct ErrorAction {57			action: String,58			level: u32,59			msg: String,60		}61		let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {62			return false;63		};64		if act.action != "msg" || act.level != 0 {65			return false;66		}67		self.collected.push(act.msg);68		true69	}70	fn finish(self) -> Result<()> {71		// fn dedent(s: String) -> String {72		// 	s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)73		// }74		if !self.collected.is_empty() {75			bail!("{}", self.collected.iter().map(|v| {76				if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {77					let v = unindent::unindent(f.trim_start());78					v.trim().to_owned()79				} else {80					v.to_owned()81				}82			}).join("\n"));83		}84		Ok(())85	}86	fn flush(self) {87		for line in self.collected {88			warn!("{line}");89		}90	}91}92impl<H: Handler> Handler for ErrorCollector<'_, H> {93	fn handle_line(&mut self, e: &str) {94		if self.handle_line_inner(e) {95			return;96		}97		self.inner.handle_line(e)98	}99}100101enum OutputLine {102	Out(String),103	Err(String),104}105struct OutputHandler {106	rx: mpsc::Receiver<OutputLine>,107	_cancel_handle: oneshot::Receiver<()>,108}109impl OutputHandler {110	fn new(out: ChildStdout, err: ChildStderr) -> Self {111		let mut out = FramedRead::new(out, LinesCodec::new());112		let mut err = FramedRead::new(err, LinesCodec::new());113		let (tx, rx) = mpsc::channel(20);114		let (mut cancelled, _cancel_handle) = oneshot::channel();115		tokio::spawn(async move {116			loop {117				select! {118					// We should receive errors earlier than synchronization119					biased;120					e = err.next() => {121						let Some(Ok(e)) = e else {122							if e.is_some() {123								error!("bad repl stderr: {e:?}");124							}125							continue;126						};127						let _ = tx.send(OutputLine::Err(e)).await;128					}129					o = out.next() => {130						let Some(Ok(o)) = o else {131							if o.is_some() {132								error!("bad repl stdout: {o:?}");133							}134							continue;135						};136						let _ = tx.send(OutputLine::Out(o)).await;137					}138					// Reader doesn't care about stdout, as this is cancelled.139					// Error still might be useful, to process leftover span closures?140					_ = cancelled.closed() => {141						break;142					}143				}144			}145		});146		Self { rx, _cancel_handle }147	}148	async fn next(&mut self) -> Option<OutputLine> {149		self.rx.recv().await150	}151}152153impl NixSessionInner {154	async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {155		let mut cmd = Command::new("nix");156		cmd.arg("repl")157			.arg(flake)158			.arg("--log-format")159			.arg("internal-json");160		for arg in extra_args {161			cmd.arg(arg);162		}163		cmd.stdin(Stdio::piped());164		cmd.stdout(Stdio::piped());165		cmd.stderr(Stdio::piped());166		let cmd = cmd.spawn()?;167		let stdout = cmd.stdout.unwrap();168		let stderr = cmd.stderr.unwrap();169		let mut out = OutputHandler::new(stdout, stderr);170		let mut stdin = cmd.stdin.unwrap();171		// Standard repl hello doesn't work with internal-json logger172		stdin.write_all(REPL_DELIMITER.as_bytes()).await?;173		stdin.write_all(b"\n").await?;174		stdin.flush().await?;175		let nix_handler = NixHandler::default();176		let mut full_delimiter = None;177		while let Some(line) = out.next().await {178			let line = match line {179				OutputLine::Out(o) => o,180				OutputLine::Err(_e) => {181					// Handle startup errors, but skip repl hello?182					//nix_handler.handle_line(&e);183					continue;184				}185			};186			if line.contains(REPL_DELIMITER) {187				debug!("discovered repl delimiter with added colors: {line}");188				full_delimiter = Some(line.to_owned());189				break;190			}191		}192		let Some(full_delimiter) = full_delimiter else {193			bail!("failed to discover delimiter");194		};195		let mut res = Self {196			full_delimiter,197			nix_handler: ClonableHandler::new(nix_handler),198			out,199			stdin,200			string_wrapping: Default::default(),201			number_wrapping: Default::default(),202203			next_id: 0,204			free_list: vec![],205		};206		res.train().await?;207		Ok(res)208	}209	async fn train(&mut self) -> Result<()> {210		{211			let full_string = self212				.execute_expression_raw(TRAIN_STRING, &mut NoopHandler)213				.await?;214			let string_offset = full_string.find(TRAIN_STRING).expect("contained");215			let string_prefix = &full_string[..string_offset];216			let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];217			self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());218		}219		{220			let full_number = self221				.execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)222				.await?;223			let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");224			let number_prefix = &full_number[..number_offset];225			let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];226			self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());227		}228		Ok(())229	}230	async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {231		self.stdin.write_all(cmd.as_ref()).await?;232		self.stdin.write_all(b"\n").await?;233		Ok(())234	}235	async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {236		let mut out = String::new();237		while let Some(line) = self.out.next().await {238			let line = match line {239				OutputLine::Out(out) => out,240				OutputLine::Err(err) => {241					err_handler.handle_line(&err);242					continue;243				}244			};245			if line == self.full_delimiter {246				return Ok(out);247			}248			if !out.is_empty() {249				out.push('\n');250			}251			out.push_str(&line);252		}253		bail!("didn't reached delimiter");254	}255	async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {256		let num = self.number_wrapping.clone();257		let n = self.execute_expression_wrapping(expr, &num).await?;258		Ok(n.parse::<u64>()?)259	}260	async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {261		let num = self.string_wrapping.clone();262		let n = self.execute_expression_wrapping(expr, &num).await?;263		let str: String = serde_json::from_str(&n)?;264		Ok(str)265	}266	async fn execute_expression_to_json<V: DeserializeOwned>(267		&mut self,268		expr: impl AsRef<[u8]>,269	) -> Result<V> {270		let mut fexpr = b"builtins.toJSON (".to_vec();271		fexpr.extend_from_slice(expr.as_ref());272		fexpr.push(b')');273		let v = self.execute_expression_string(fexpr).await?;274		Ok(serde_json::from_str(&v)?)275	}276	async fn execute_expression_wrapping(277		&mut self,278		expr: impl AsRef<[u8]>,279		wrapping: &(String, String),280	) -> Result<String> {281		let mut nix_handler = self.nix_handler.clone();282		let mut collected = ErrorCollector::new(&mut nix_handler);283		let res = self.execute_expression_raw(expr, &mut collected).await?;284		if res.is_empty() {285			collected.finish()?;286			bail!("expected expression, got nothing")287		} else {288			collected.flush()289		};290		let Some(res) = res.strip_prefix(&wrapping.0) else {291			bail!("invalid type")292		};293		let Some(res) = res.strip_suffix(&wrapping.1) else {294			bail!("invalid type")295		};296		Ok(res.to_owned())297	}298	async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {299		let mut nix_handler = self.nix_handler.clone();300		let mut collected = ErrorCollector::new(&mut nix_handler);301		let v = self.execute_expression_raw(expr, &mut collected).await?;302		collected.finish()?;303		ensure!(v.is_empty(), "unexpected expression result");304		Ok(())305	}306	async fn execute_expression_raw(307		&mut self,308		expr: impl AsRef<[u8]>,309		err_handler: &mut dyn Handler,310	) -> Result<String> {311		self.send_command(expr).await?;312		// It will be echoed313		self.send_command(REPL_DELIMITER).await?;314		self.read_until_delimiter(err_handler).await315	}316	async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {317		let id = self.allocate_id();318		self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))319			.await?;320		Ok(id)321	}322323	/// Id should be immediately used324	fn allocate_id(&mut self) -> u32 {325		if let Some(free) = self.free_list.pop() {326			free327		} else {328			let v = self.next_id;329			self.next_id += 1;330			v331		}332	}333	// Nix has no way to deallocate variable, yet GC will correct everything not reachable.334	// async fn free_id(&mut self, id: u32) -> Result<()> {335	// 	self.execute_expression_empty(format!("sess_field_{id} = null"))336	// 		.await?;337	// 	self.free_list.push(id);338	// 	Ok(())339	// }340}341342#[derive(Clone)]343pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);344345#[derive(Clone)]346enum Index {347	String(String),348	// Idx(u32),349}350impl Display for Index {351	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {352		match self {353			Index::String(k) => {354				let v = nixlike::format_identifier(k.as_str());355				write!(f, ".{v}")356			}357		}358	}359}360struct PathDisplay<'i>(&'i [Index]);361impl Display for PathDisplay<'_> {362	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {363		write!(f, "flake")?;364		for i in self.0 {365			write!(f, "{i}")?;366		}367		Ok(())368	}369}370pub struct Field {371	full_path: Vec<Index>,372	session: NixSession,373	value: Option<u32>,374}375impl Field {376	fn root(session: NixSession) -> Self {377		Self {378			full_path: vec![],379			session,380			value: None,381		}382	}383	pub async fn field(session: NixSession, field: &str) -> Result<Self> {384		Self::root(session).get_field_deep([field]).await385	}386	pub async fn get_json_deep<'a, V: DeserializeOwned>(387		&self,388		name: impl IntoIterator<Item = &'a str>,389	) -> Result<V> {390		let field = self.get_field_deep(name).await?;391		field.as_json().await392	}393	pub async fn get_field(&self, name: &str) -> Result<Self> {394		self.get_field_deep([name]).await395	}396	pub async fn get_field_deep<'a>(397		&self,398		name: impl IntoIterator<Item = &'a str>,399	) -> Result<Self> {400		let mut iter = name.into_iter();401402		let mut full_path = self.full_path.clone();403		let mut query = if let Some(id) = self.value {404			format!("sess_field_{id}")405		} else {406			let first = iter.next().expect("name not empty");407			ensure!(408				!(first.contains('.') | first.contains(' ')),409				"bad name for root query: {first}"410			);411			full_path.push(Index::String(first.to_string()));412			first.to_string()413		};414		for v in iter {415			full_path.push(Index::String(v.to_string()));416			// Escape417			let escaped = nixlike::serialize(v)?;418			let escaped = escaped.trim();419			query.push('.');420			query.push_str(escaped);421		}422423		let vid = self424			.session425			.0426			.lock()427			.await428			.execute_assign(&query)429			.await430			.with_context(|| format!("full path: {}", PathDisplay(&full_path)))?;431		Ok(Self {432			full_path,433			session: self.session.clone(),434			value: Some(vid),435		})436	}437	pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {438		let id = self.value.expect("can't serialize root field");439		self.session440			.0441			.lock()442			.await443			.execute_expression_to_json(&format!("sess_field_{id}"))444			.await445			.with_context(|| format!("full path: {}", PathDisplay(&self.full_path)))446	}447	pub async fn list_fields(&self) -> Result<Vec<String>> {448		let id = self.value.expect("can't list root fields");449		self.session450			.0451			.lock()452			.await453			.execute_expression_to_json(&format!("builtins.attrNames sess_field_{id}"))454			.await455			.with_context(|| format!("full path: {}", PathDisplay(&self.full_path)))456	}457}458impl Drop for Field {459	fn drop(&mut self) {460		if let Some(id) = self.value {461			if let Ok(mut lock) = self.session.0.try_lock() {462				lock.free_list.push(id)463			}464			// Leaked465		}466	}467}468struct NixSessionPoolInner {469	flake: OsString,470	nix_args: Vec<OsString>,471}472473#[derive(Debug)]474pub struct NixPoolError(anyhow::Error);475impl From<anyhow::Error> for NixPoolError {476	fn from(value: anyhow::Error) -> Self {477		Self(value)478	}479}480impl std::error::Error for NixPoolError {}481impl std::fmt::Display for NixPoolError {482	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {483		self.0.fmt(f)484	}485}486impl r2d2::ManageConnection for NixSessionPoolInner {487	type Connection = NixSessionInner;488	type Error = NixPoolError;489	fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {490		let _v = TOKIO_RUNTIME491			.get()492			.expect("missed tokio runtime init!")493			.enter();494		Ok(futures::executor::block_on(NixSessionInner::new(495			self.flake.as_os_str(),496			self.nix_args.iter().map(OsString::as_os_str),497		))?)498	}499500	fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {501		let _v = TOKIO_RUNTIME502			.get()503			.expect("missed tokio runtime init!")504			.enter();505		let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;506		if res != 4 {507			return Err(anyhow!("sanity check failed").into());508		};509		Ok(())510	}511512	fn has_broken(&self, _conn: &mut Self::Connection) -> bool {513		false514	}515}516pub struct NixSessionPool(Pool<NixSessionPoolInner>);517impl NixSessionPool {518	pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {519		let inner = tokio::task::block_in_place(|| {520			r2d2::Builder::<NixSessionPoolInner>::new()521				.min_idle(Some(0))522				.build(NixSessionPoolInner { flake, nix_args })523		})?;524		Ok(Self(inner))525	}526	pub async fn get(&self) -> Result<NixSession> {527		let v = tokio::task::block_in_place(|| self.0.get())?;528		Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))529	}530}531532pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();