git.delta.rocks / jrsonnet / refs/commits / 7de62040eec5

difftreelog

refactor replace builtins.currentSystem with pure alternative

Yaroslav Bolyukin2024-11-30parent: #3e7b063.patch.diff
in: trunk

5 files changed

addedcrates/fleet-base/build.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/fleet-base/build.rs
@@ -0,0 +1,15 @@
+use std::env;
+
+fn main() {
+	let target = env::var("TARGET").expect("TARGET env var is set by cargo");
+
+	let nix_system = if target.starts_with("x86_64-unknown-linux-") {
+		"x86_64-linux"
+	} else if target.starts_with("aarch64-unknown-linux-") {
+		"aarch64-linux"
+	} else {
+		panic!("unknown nix system name for rust {target} triple!");
+	};
+
+	println!("cargo:rustc-env=NIX_SYSTEM={nix_system}");
+}
modifiedcrates/fleet-base/src/opts.rsdiffbeforeafterboth
--- a/crates/fleet-base/src/opts.rs
+++ b/crates/fleet-base/src/opts.rs
@@ -8,7 +8,7 @@
 
 use anyhow::Result;
 use clap::Parser;
-use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSessionPool, Value};
+use nix_eval::{nix_go, util::assert_warn, NixSessionPool, Value};
 use nom::{
 	bytes::complete::take_while1,
 	character::complete::char,
@@ -85,14 +85,16 @@
 
 	/// Override detected system for host, to perform builds via
 	/// binfmt-declared qemu instead of trying to crosscompile
-	// TODO: Remove, as it is not used anymore.
-	#[clap(long, default_value = "detect")]
+	#[clap(long, default_value = env!("NIX_SYSTEM"))]
 	pub local_system: String,
 }
 
 impl FleetOpts {
-	pub async fn filter_skipped(&self, hosts: impl IntoIterator<Item = ConfigHost>) -> Result<Vec<ConfigHost>> {
-		let mut out = Vec::new();	
+	pub async fn filter_skipped(
+		&self,
+		hosts: impl IntoIterator<Item = ConfigHost>,
+	) -> Result<Vec<ConfigHost>> {
+		let mut out = Vec::new();
 		for host in hosts {
 			if self.should_skip(&host).await? {
 				continue;
@@ -182,15 +184,15 @@
 	pub async fn build(&self, nix_args: Vec<OsString>) -> Result<Config> {
 		let directory = current_dir()?;
 
-		let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;
+		let pool = NixSessionPool::new(
+			directory.as_os_str().to_owned(),
+			nix_args.clone(),
+			self.local_system.clone(),
+		)
+		.await?;
 		let nix_session = pool.get().await?;
 
 		let builtins_field = Value::binding(nix_session.clone(), "builtins").await?;
-		let local_system = if self.local_system == "detect" {
-			nix_go_json!(builtins_field.currentSystem)
-		} else {
-			self.local_system.clone()
-		};
 
 		let mut fleet_data_path = directory.clone();
 		fleet_data_path.push("fleet.nix");
@@ -210,14 +212,14 @@
 
 		let default_pkgs = nix_go!(nixpkgs(Obj {
 			overlays,
-			system: local_system.clone(),
+			system: self.local_system.clone(),
 		}));
 
 		Ok(Config(Arc::new(FleetConfigInternals {
 			nix_session,
 			directory,
 			data,
-			local_system,
+			local_system: self.local_system.clone(),
 			nix_args,
 			config_field,
 			default_pkgs,
modifiedcrates/nix-eval/src/lib.rsdiffbeforeafterboth
--- a/crates/nix-eval/src/lib.rs
+++ b/crates/nix-eval/src/lib.rs
@@ -41,8 +41,8 @@
 
 #[instrument(skip(session, values))]
 async fn build_multiple(name: String, session: NixSession, values: Vec<Value>) -> Result<()> {
+	let system = session.0.lock().await.nix_system.clone();
 	let builtins = Value::binding(session, "builtins").await?;
-	let system = nix_go!(builtins.currentSystem);
 	let drv = nix_go!(builtins.derivation(Obj {
 		system,
 		name,
modifiedcrates/nix-eval/src/pool.rsdiffbeforeafterboth
--- a/crates/nix-eval/src/pool.rs
+++ b/crates/nix-eval/src/pool.rs
@@ -1,18 +1,23 @@
-use std::ffi::OsString;
-use std::sync::{Arc, OnceLock};
+use std::{
+	ffi::OsString,
+	sync::{Arc, OnceLock},
+};
 
 use r2d2::Pool;
 
-use crate::session::NixSessionInner;
-use crate::{Error, NixSession, Result};
+use crate::{session::NixSessionInner, Error, NixSession, Result};
 
 pub struct NixSessionPool(Pool<NixSessionPoolInner>);
 impl NixSessionPool {
-	pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {
+	pub async fn new(flake: OsString, nix_args: Vec<OsString>, nix_system: String) -> Result<Self> {
 		let inner = tokio::task::block_in_place(|| {
 			r2d2::Builder::<NixSessionPoolInner>::new()
 				.min_idle(Some(0))
-				.build(NixSessionPoolInner { flake, nix_args })
+				.build(NixSessionPoolInner {
+					flake,
+					nix_args,
+					nix_system,
+				})
 		})?;
 		Ok(Self(inner))
 	}
@@ -25,6 +30,7 @@
 pub(crate) struct NixSessionPoolInner {
 	flake: OsString,
 	nix_args: Vec<OsString>,
+	pub(crate) nix_system: String,
 }
 
 impl r2d2::ManageConnection for NixSessionPoolInner {
@@ -38,6 +44,7 @@
 		Ok(futures::executor::block_on(NixSessionInner::new(
 			self.flake.as_os_str(),
 			self.nix_args.iter().map(OsString::as_os_str),
+			self.nix_system.clone(),
 		))?)
 	}
 
modifiedcrates/nix-eval/src/session.rsdiffbeforeafterboth
after · crates/nix-eval/src/session.rs
1use std::{ffi::OsStr, num::ParseIntError, process::Stdio, sync::Arc};23use better_command::{ClonableHandler, Handler, NixHandler, NoopHandler};4use futures::StreamExt;5use itertools::Itertools as _;6use serde::{de::DeserializeOwned, Deserialize};7use thiserror::Error;8use tokio::{9	io::AsyncWriteExt,10	process::{ChildStderr, ChildStdin, ChildStdout, Command},11	select,12	sync::{mpsc, oneshot, Mutex},13};14use tokio_util::codec::{FramedRead, LinesCodec};15use tracing::{debug, error, warn, Level};1617#[derive(Error, Debug, Clone)]18pub enum Error {19	#[error("failed to create nix repl session: {0}")]20	SessionInit(&'static str),21	#[error("unexpected end of output, nix crashed?")]22	MissingDelimiter,2324	#[error("expression did'nt produce any output")]25	ExpectedOutput,26	#[error("expression produced output, which is unexpected")]27	UnexpectedOutput,2829	#[error("unexpected expression output type")]30	InvalidType,3132	#[error("failed to build attr {attribute}:\n{error}")]33	BuildFailed { attribute: String, error: String },3435	#[error("output: {0}")]36	Json(Arc<serde_json::Error>),37	// int outputs are too specific, and should not be used,38	// thus error is ok to be not informative.39	#[error("int output: {0}")]40	Int(ParseIntError),41	#[error("pool: {0}")]42	Pool(Arc<r2d2::Error>),43	#[error("io: {0}")]44	Io(Arc<std::io::Error>),4546	// TODO: Should be done by wrapper/in different type.47	#[error("at {0}: {1}")]48	InContext(String, Box<Self>),4950	#[error("error: {0}")]51	NixError(String),52}53impl From<r2d2::Error> for Error {54	fn from(value: r2d2::Error) -> Self {55		Self::Pool(Arc::new(value))56	}57}58impl From<std::io::Error> for Error {59	fn from(value: std::io::Error) -> Self {60		Self::Io(Arc::new(value))61	}62}63impl From<serde_json::Error> for Error {64	fn from(value: serde_json::Error) -> Self {65		Self::Json(Arc::new(value))66	}67}68impl Error {69	pub(crate) fn context(self, context: String) -> Self {70		Self::InContext(context, Box::new(self))71	}72}73pub type Result<T, E = Error> = std::result::Result<T, E>;7475enum OutputLine {76	Out(String),77	Err(String),78}79struct OutputHandler {80	rx: mpsc::Receiver<OutputLine>,81	_cancel_handle: oneshot::Receiver<()>,82}83impl OutputHandler {84	fn new(out: ChildStdout, err: ChildStderr) -> Self {85		let mut out = FramedRead::new(out, LinesCodec::new());86		let mut err = FramedRead::new(err, LinesCodec::new());87		let (tx, rx) = mpsc::channel(20);88		let (mut cancelled, _cancel_handle) = oneshot::channel();89		tokio::spawn(async move {90			loop {91				select! {92					// We should receive errors earlier than synchronization93					biased;94					e = err.next() => {95						let Some(Ok(e)) = e else {96							if e.is_some() {97								error!("bad repl stderr: {e:?}");98							}99							continue;100						};101						let _ = tx.send(OutputLine::Err(e)).await;102					}103					o = out.next() => {104						let Some(Ok(o)) = o else {105							if o.is_some() {106								error!("bad repl stdout: {o:?}");107							}108							continue;109						};110						let _ = tx.send(OutputLine::Out(o)).await;111					}112					// Reader doesn't care about stdout, as this is cancelled.113					// Error still might be useful, to process leftover span closures?114					_ = cancelled.closed() => {115						break;116					}117				}118			}119		});120		Self { rx, _cancel_handle }121	}122	async fn next(&mut self) -> Option<OutputLine> {123		self.rx.recv().await124	}125}126127#[must_use]128struct ErrorCollector<'i, H> {129	collected: Vec<String>,130	inner: &'i mut H,131}132impl<'i, H> ErrorCollector<'i, H> {133	fn new(inner: &'i mut H) -> Self {134		Self {135			collected: vec![],136			inner,137		}138	}139}140impl<H> ErrorCollector<'_, H> {141	fn handle_line_inner(&mut self, msg: &str) -> bool {142		let Some(msg) = msg.strip_prefix("@nix ") else {143			return false;144		};145		#[derive(Deserialize)]146		struct ErrorAction {147			action: String,148			level: u32,149			msg: String,150		}151		let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {152			return false;153		};154		if act.action != "msg" || act.level != 0 {155			return false;156		}157		self.collected.push(act.msg);158		true159	}160	fn finish(self) -> Result<()> {161		// fn dedent(s: String) -> String {162		// 	s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)163		// }164		if !self.collected.is_empty() {165			return Err(Error::NixError(166				self.collected167					.iter()168					.map(|v| {169						if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {170							let v = unindent::unindent(f.trim_start());171							v.trim().to_owned()172						} else {173							v.to_owned()174						}175					})176					.join("\n")177					.to_string(),178			));179		}180		Ok(())181	}182	fn flush(self) {183		for line in self.collected {184			warn!("{line}");185		}186	}187}188impl<H: Handler> Handler for ErrorCollector<'_, H> {189	fn handle_line(&mut self, e: &str) {190		if self.handle_line_inner(e) {191			return;192		}193		self.inner.handle_line(e)194	}195}196197pub struct NixSessionInner {198	full_delimiter: String,199	nix_handler: ClonableHandler<NixHandler>,200	out: OutputHandler,201	stdin: ChildStdin,202	string_wrapping: (String, String),203	number_wrapping: (String, String),204205	executing_command: Arc<Mutex<()>>,206207	next_id: u32,208	pub(crate) free_list: Vec<u32>,209210	pub nix_system: String,211}212213/// Discover inter-message repl delimiter214const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";215/// Discover formatting around strings216const TRAIN_STRING: &str = "\"TRAIN_STRING\"";217/// Discover formatting around numbers218const TRAIN_NUMBER: &str = "13141516";219// Other types of formatting are not discovered, because they are not used, JSON serialization is used instead220// Techically, number training is also not required, because numbers can be converted to string too...221// Eh, I'll remove it later.222223impl NixSessionInner {224	pub(crate) async fn new(225		flake: &OsStr,226		extra_args: impl IntoIterator<Item = &OsStr>,227		nix_system: String,228	) -> Result<Self> {229		let mut cmd = Command::new("nix");230		cmd.arg("repl")231			.args(["--option", "pure-eval", "true"])232			.arg(flake)233			.arg("--log-format")234			.arg("internal-json");235		for arg in extra_args {236			cmd.arg(arg);237		}238		cmd.stdin(Stdio::piped());239		cmd.stdout(Stdio::piped());240		cmd.stderr(Stdio::piped());241		let cmd = cmd.spawn()?;242		let stdout = cmd.stdout.unwrap();243		let stderr = cmd.stderr.unwrap();244		let mut out = OutputHandler::new(stdout, stderr);245		let mut stdin = cmd.stdin.unwrap();246		// Standard repl hello doesn't work with internal-json logger247		stdin.write_all(REPL_DELIMITER.as_bytes()).await?;248		stdin.write_all(b"\n").await?;249		stdin.flush().await?;250		let nix_handler = NixHandler::default();251		let mut full_delimiter = None;252		let mut errors = vec![];253		while let Some(line) = out.next().await {254			let line = match line {255				OutputLine::Out(o) => o,256				OutputLine::Err(_e) => {257					// Handle startup errors, but skip repl hello?258					errors.push(_e);259					continue;260				}261			};262			if line.contains(REPL_DELIMITER) {263				debug!("discovered repl delimiter with added colors: {line}");264				full_delimiter = Some(line.to_owned());265				break;266			}267		}268		let Some(full_delimiter) = full_delimiter else {269			for e in errors {270				error!("{e}");271			}272			return Err(Error::SessionInit("failed to discover delimiter"));273		};274		let mut res = Self {275			full_delimiter,276			nix_handler: ClonableHandler::new(nix_handler),277			out,278			stdin,279			string_wrapping: Default::default(),280			number_wrapping: Default::default(),281282			executing_command: Arc::new(Mutex::new(())),283284			next_id: 0,285			free_list: vec![],286287			nix_system,288		};289		res.train().await?;290		Ok(res)291	}292	async fn train(&mut self) -> Result<()> {293		{294			let full_string = self295				.execute_expression_raw(TRAIN_STRING, &mut NoopHandler)296				.await?;297			let string_offset = full_string.find(TRAIN_STRING).expect("contained");298			let string_prefix = &full_string[..string_offset];299			let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];300			self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());301		}302		{303			let full_number = self304				.execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)305				.await?;306			let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");307			let number_prefix = &full_number[..number_offset];308			let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];309			self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());310		}311		Ok(())312	}313	async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {314		if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {315			let cmd_str = String::from_utf8_lossy(cmd.as_ref());316			tracing::debug!("{cmd_str}");317		};318		self.stdin.write_all(cmd.as_ref()).await?;319		self.stdin.write_all(b"\n").await?;320		Ok(())321	}322	async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {323		let mut out = String::new();324		while let Some(line) = self.out.next().await {325			let line = match line {326				OutputLine::Out(out) => out,327				OutputLine::Err(err) => {328					err_handler.handle_line(&err);329					continue;330				}331			};332			if line == self.full_delimiter {333				return Ok(out);334			}335			if !out.is_empty() {336				out.push('\n');337			}338			out.push_str(&line);339		}340		Err(Error::MissingDelimiter)341	}342	pub(crate) async fn execute_expression_number(343		&mut self,344		expr: impl AsRef<[u8]>,345	) -> Result<u64> {346		let num = self.number_wrapping.clone();347		let n = self.execute_expression_wrapping(expr, &num).await?;348		n.parse::<u64>().map_err(Error::Int)349	}350	async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {351		// builtins.toJSON escapes some thing in incorrect way, e.g escaped "$" in "\${" is being outputed as "\$",352		// while this escape should be removed as it is intended for nix itself, not for json output.353		//354		// This regex only allows \$ in the beginning of the string, it is easier to implement correctly.355		// TODO: Add peg parser for nix-produced JSON?..356		let regex = regex::Regex::new(r#"(?<prefix>[: {,\[]\\")\\\$"#).expect("fixup json");357358		let num = self.string_wrapping.clone();359		let n = self.execute_expression_wrapping(expr, &num).await?;360		let n = regex.replace_all(&n, "$prefix$$");361		let str: String = serde_json::from_str(&n)?;362		Ok(str)363	}364	pub(crate) async fn execute_expression_to_json<V: DeserializeOwned>(365		&mut self,366		expr: impl AsRef<[u8]>,367	) -> Result<V> {368		let mut fexpr = b"builtins.toJSON (".to_vec();369		fexpr.extend_from_slice(expr.as_ref());370		fexpr.push(b')');371372		Ok(serde_json::from_str(373			&self.execute_expression_string(fexpr).await?,374		)?)375	}376	async fn execute_expression_wrapping(377		&mut self,378		expr: impl AsRef<[u8]>,379		wrapping: &(String, String),380	) -> Result<String> {381		let mut nix_handler = self.nix_handler.clone();382		let mut collected = ErrorCollector::new(&mut nix_handler);383		let res = self.execute_expression_raw(expr, &mut collected).await?;384		if res.is_empty() {385			collected.finish()?;386			return Err(Error::ExpectedOutput);387		} else {388			collected.flush()389		};390		let Some(res) = res.strip_prefix(&wrapping.0) else {391			return Err(Error::InvalidType);392		};393		let Some(res) = res.strip_suffix(&wrapping.1) else {394			return Err(Error::InvalidType);395		};396		Ok(res.to_owned())397	}398	async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {399		let mut nix_handler = self.nix_handler.clone();400		let mut collected = ErrorCollector::new(&mut nix_handler);401		let v = self.execute_expression_raw(expr, &mut collected).await?;402		collected.finish()?;403		if !v.is_empty() {404			return Err(Error::UnexpectedOutput);405		}406		Ok(())407	}408	pub(crate) async fn execute_expression_raw(409		&mut self,410		expr: impl AsRef<[u8]>,411		err_handler: &mut dyn Handler,412	) -> Result<String> {413		// Prevent two commands from being executed in parallel, messing with each other.414		let _lock = self.executing_command.clone();415		let _guard = _lock.lock().await;416417		self.send_command(expr).await?;418		// It will be echoed419		self.send_command(REPL_DELIMITER).await?;420		self.read_until_delimiter(err_handler).await421	}422	pub(crate) async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {423		let id = self.allocate_id();424		self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))425			.await?;426		Ok(id)427	}428429	/// Id should be immediately used430	fn allocate_id(&mut self) -> u32 {431		if let Some(free) = self.free_list.pop() {432			free433		} else {434			let v = self.next_id;435			self.next_id += 1;436			v437		}438	}439	// Nix has no way to deallocate variable, yet GC will correct everything not reachable.440	// async fn free_id(&mut self, id: u32) -> Result<()> {441	// 	self.execute_expression_empty(format!("sess_field_{id} = null"))442	// 		.await?;443	// 	self.free_list.push(id);444	// 	Ok(())445	// }446}