git.delta.rocks / jrsonnet / refs/commits / 353ae3be2d27

difftreelog

source

cmds/fleet/src/nix_eval.rs6.4 KiBsourcehistory
1//! Calling nix eval for everything is slow, it is not easy to link nix evaluator itself,2//! and tvix-nix doesn't have proper flake support. Fleets solution: automating nix repl calls.3//!4//! Api is synchronous, yet it is good enough with pooling, and in environment without IFDs for using5//! those blocking calls from async code.67use std::borrow::Cow;8use std::sync::{Arc, Mutex};9use std::time::Instant;1011use anyhow::{anyhow, bail, ensure, Context, Result};12use itertools::Itertools;13use r2d2::PooledConnection;14use rexpect::session::{PtyReplSession, PtySession};15use serde::de::DeserializeOwned;16use std::ffi::OsString;17use tracing::info_span;1819fn parse_error(res: &str) -> Option<String> {20	let res = if let Some(v) = res.strip_prefix("error: ") {21		if let Some((first_line, next)) = v.split_once('\n') {22			format!("{first_line}\n{}", unindent::unindent(next))23		} else {24			v.trim_start().to_owned()25		}26	} else if let Some(v) = res.strip_prefix("error:\n") {27		let mut v = v.to_owned();28		v.insert(0, '\n');29		unindent::unindent(&v).trim_start().to_owned()30	} else {31		return None;32	};33	let res = res.trim_end();34	Some(35		res.replace('Â', "")36			.split('\n')37			.map(|l| l.strip_prefix(\u{80}¦ ").unwrap_or(l))38			.join("\n"),39	)40}41pub struct NixSessionPool {42	pub flake: OsString,43	pub nix_args: Vec<OsString>,44}4546#[derive(Debug)]47pub struct NixPoolError(anyhow::Error);48impl From<anyhow::Error> for NixPoolError {49	fn from(value: anyhow::Error) -> Self {50		Self(value)51	}52}53impl std::error::Error for NixPoolError {}54impl std::fmt::Display for NixPoolError {55	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {56		self.0.fmt(f)57	}58}5960impl r2d2::ManageConnection for NixSessionPool {61	type Connection = NixSession;62	type Error = NixPoolError;6364	fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {65		Ok(NixSession::new(&self.flake, &self.nix_args, None)?)66	}6768	fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {69		let res = conn.expression_result("2 + 2")?;70		if res != "4" {71			return Err(anyhow!("basic expression failed").into());72		}73		Ok(())74	}7576	fn has_broken(&self, conn: &mut Self::Connection) -> bool {77		conn.finished78	}79}8081pub struct NixSession {82	session: PtyReplSession,83	next_id: u32,84	free_list: Vec<u32>,85	finished: bool,86}87impl NixSession {88	fn new(flake: &OsString, args: &[OsString], timeout: Option<u64>) -> Result<Self> {89		let mut cmd = std::process::Command::new("nix");90		cmd.arg("repl");91		cmd.arg(flake);92		for arg in args {93			cmd.arg(arg);94		}95		cmd.env("TERM", "dumb");96		cmd.env("NO_COLOR", "1");97		let pty_session = rexpect::session::spawn_command(cmd, timeout)?;98		let mut repl = PtyReplSession {99			prompt: "nix-repl> ".to_string(),100			pty_session,101			quit_command: Some(":q".to_string()),102			echo_on: true,103		};104		repl.wait_for_prompt()?;105		Ok(Self {106			session: repl,107			next_id: 0,108			free_list: vec![],109			finished: false,110		})111	}112	fn expression_result(&mut self, cmd: &str) -> Result<String> {113		dbg!(cmd);114		self.session.send_line(cmd)?;115		dbg!("waiting");116		let result = self.session.wait_for_prompt()?;117		let result = strip_ansi_escapes::strip_str(&result);118		let result = result.trim();119		dbg!(result);120		Ok(result.to_owned())121	}122	fn json_result<V: DeserializeOwned>(&mut self, cmd: &str) -> Result<V> {123		let v = match self.expression_result(&format!("builtins.toJSON ({cmd})")) {124			Ok(v) => {125				if let Some(e) = parse_error(&v) {126					bail!("{e}")127				}128				v129			}130			Err(e) => {131				self.finished = true;132				bail!("{e}")133			}134		};135		// Remove outer quoting136		let v: String = serde_json::from_str(&v)?;137		Ok(serde_json::from_str(&v)?)138	}139	/// Id should be immediately used140	fn allocate_id(&mut self) -> u32 {141		if let Some(free) = self.free_list.pop() {142			free143		} else {144			let v = self.next_id;145			self.next_id += 1;146			v147		}148	}149	fn allocate_result(&mut self, cmd: &str) -> Result<u32> {150		let id = self.allocate_id();151		match self.expression_result(&format!("sess_field_{id} = ({cmd})")) {152			Ok(v) => {153				if let Some(e) = parse_error(&v) {154					self.free_list.push(id);155					bail!("{e}")156				}157			}158			Err(e) => {159				self.finished = true;160			}161		}162163		Ok(id)164	}165	/// Nix has no way to deallocate variable, yet GC will correct everything not reachable.166	fn free_id(&mut self, id: u32) {167		if let Err(e) = self.expression_result(&format!("sess_field_{id} = null")) {168			self.finished = true;169		} else {170			self.free_list.push(id)171		}172	}173}174175#[derive(Clone, Debug)]176enum Index {177	String(String),178	Idx(u32),179}180181pub struct Field {182	full_path: Vec<Index>,183	session: Arc<Mutex<PooledConnection<NixSessionPool>>>,184	value: Option<u32>,185}186impl Field {187	pub fn root(conn: PooledConnection<NixSessionPool>) -> Self {188		Self {189			full_path: vec![],190			session: Arc::new(Mutex::new(conn)),191			value: None,192		}193	}194	pub fn get_field_deep<'a>(&self, name: impl IntoIterator<Item = &'a str>) -> Result<Self> {195		let mut iter = name.into_iter();196197		let mut full_path = self.full_path.clone();198		let mut query = if let Some(id) = self.value {199			format!("sess_field_{id}")200		} else {201			let first = iter.next().expect("name not empty");202			ensure!(203				!(first.contains('.') | first.contains(' ')),204				"bad name for root query: {first}"205			);206			full_path.push(Index::String(first.to_string()));207			first.to_string()208		};209		for v in iter {210			full_path.push(Index::String(v.to_string()));211			// Escape212			let escaped = nixlike::serialize(v)?;213			let escaped = escaped.trim();214			query.push('.');215			query.push_str(escaped);216		}217218		let vid = self219			.session220			.lock()221			.unwrap()222			.allocate_result(&query)223			.with_context(|| format!("full path: {:?}", full_path))?;224		Ok(Self {225			full_path,226			session: self.session.clone(),227			value: Some(vid),228		})229	}230	pub fn get_field<'a>(&self, name: &str) -> Result<Self> {231		self.get_field_deep([name])232	}233	pub fn as_json<V: DeserializeOwned>(&self) -> Result<V> {234		let id = self.value.expect("can't serialize root field");235		self.session236			.lock()237			.unwrap()238			.json_result(&format!("sess_field_{id}"))239			.with_context(|| format!("full path: {:?}", self.full_path))240	}241	pub fn list_fields(&self) -> Result<Vec<String>> {242		let id = self.value.expect("can't list root fields");243		self.session244			.lock()245			.unwrap()246			.json_result(&format!("builtins.attrNames sess_field_{id}"))247			.with_context(|| format!("full path: {:?}", self.full_path))248	}249}250impl Drop for Field {251	fn drop(&mut self) {252		if let Some(id) = self.value {253			self.session.lock().unwrap().free_id(id)254		}255	}256}