difftreelog
refactor replace builtins.currentSystem with pure alternative
in: trunk
5 files changed
crates/fleet-base/build.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/fleet-base/build.rs
@@ -0,0 +1,15 @@
+use std::env;
+
+fn main() {
+ let target = env::var("TARGET").expect("TARGET env var is set by cargo");
+
+ let nix_system = if target.starts_with("x86_64-unknown-linux-") {
+ "x86_64-linux"
+ } else if target.starts_with("aarch64-unknown-linux-") {
+ "aarch64-linux"
+ } else {
+ panic!("unknown nix system name for rust {target} triple!");
+ };
+
+ println!("cargo:rustc-env=NIX_SYSTEM={nix_system}");
+}
crates/fleet-base/src/opts.rsdiffbeforeafterboth--- a/crates/fleet-base/src/opts.rs
+++ b/crates/fleet-base/src/opts.rs
@@ -8,7 +8,7 @@
use anyhow::Result;
use clap::Parser;
-use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSessionPool, Value};
+use nix_eval::{nix_go, util::assert_warn, NixSessionPool, Value};
use nom::{
bytes::complete::take_while1,
character::complete::char,
@@ -85,14 +85,16 @@
/// Override detected system for host, to perform builds via
/// binfmt-declared qemu instead of trying to crosscompile
- // TODO: Remove, as it is not used anymore.
- #[clap(long, default_value = "detect")]
+ #[clap(long, default_value = env!("NIX_SYSTEM"))]
pub local_system: String,
}
impl FleetOpts {
- pub async fn filter_skipped(&self, hosts: impl IntoIterator<Item = ConfigHost>) -> Result<Vec<ConfigHost>> {
- let mut out = Vec::new();
+ pub async fn filter_skipped(
+ &self,
+ hosts: impl IntoIterator<Item = ConfigHost>,
+ ) -> Result<Vec<ConfigHost>> {
+ let mut out = Vec::new();
for host in hosts {
if self.should_skip(&host).await? {
continue;
@@ -182,15 +184,15 @@
pub async fn build(&self, nix_args: Vec<OsString>) -> Result<Config> {
let directory = current_dir()?;
- let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;
+ let pool = NixSessionPool::new(
+ directory.as_os_str().to_owned(),
+ nix_args.clone(),
+ self.local_system.clone(),
+ )
+ .await?;
let nix_session = pool.get().await?;
let builtins_field = Value::binding(nix_session.clone(), "builtins").await?;
- let local_system = if self.local_system == "detect" {
- nix_go_json!(builtins_field.currentSystem)
- } else {
- self.local_system.clone()
- };
let mut fleet_data_path = directory.clone();
fleet_data_path.push("fleet.nix");
@@ -210,14 +212,14 @@
let default_pkgs = nix_go!(nixpkgs(Obj {
overlays,
- system: local_system.clone(),
+ system: self.local_system.clone(),
}));
Ok(Config(Arc::new(FleetConfigInternals {
nix_session,
directory,
data,
- local_system,
+ local_system: self.local_system.clone(),
nix_args,
config_field,
default_pkgs,
crates/nix-eval/src/lib.rsdiffbeforeafterboth--- a/crates/nix-eval/src/lib.rs
+++ b/crates/nix-eval/src/lib.rs
@@ -41,8 +41,8 @@
#[instrument(skip(session, values))]
async fn build_multiple(name: String, session: NixSession, values: Vec<Value>) -> Result<()> {
+ let system = session.0.lock().await.nix_system.clone();
let builtins = Value::binding(session, "builtins").await?;
- let system = nix_go!(builtins.currentSystem);
let drv = nix_go!(builtins.derivation(Obj {
system,
name,
crates/nix-eval/src/pool.rsdiffbeforeafterboth--- a/crates/nix-eval/src/pool.rs
+++ b/crates/nix-eval/src/pool.rs
@@ -1,18 +1,23 @@
-use std::ffi::OsString;
-use std::sync::{Arc, OnceLock};
+use std::{
+ ffi::OsString,
+ sync::{Arc, OnceLock},
+};
use r2d2::Pool;
-use crate::session::NixSessionInner;
-use crate::{Error, NixSession, Result};
+use crate::{session::NixSessionInner, Error, NixSession, Result};
pub struct NixSessionPool(Pool<NixSessionPoolInner>);
impl NixSessionPool {
- pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {
+ pub async fn new(flake: OsString, nix_args: Vec<OsString>, nix_system: String) -> Result<Self> {
let inner = tokio::task::block_in_place(|| {
r2d2::Builder::<NixSessionPoolInner>::new()
.min_idle(Some(0))
- .build(NixSessionPoolInner { flake, nix_args })
+ .build(NixSessionPoolInner {
+ flake,
+ nix_args,
+ nix_system,
+ })
})?;
Ok(Self(inner))
}
@@ -25,6 +30,7 @@
pub(crate) struct NixSessionPoolInner {
flake: OsString,
nix_args: Vec<OsString>,
+ pub(crate) nix_system: String,
}
impl r2d2::ManageConnection for NixSessionPoolInner {
@@ -38,6 +44,7 @@
Ok(futures::executor::block_on(NixSessionInner::new(
self.flake.as_os_str(),
self.nix_args.iter().map(OsString::as_os_str),
+ self.nix_system.clone(),
))?)
}
crates/nix-eval/src/session.rsdiffbeforeafterboth1use std::{ffi::OsStr, num::ParseIntError, process::Stdio, sync::Arc};23use better_command::{ClonableHandler, Handler, NixHandler, NoopHandler};4use futures::StreamExt;5use itertools::Itertools as _;6use serde::{de::DeserializeOwned, Deserialize};7use thiserror::Error;8use tokio::{9 io::AsyncWriteExt,10 process::{ChildStderr, ChildStdin, ChildStdout, Command},11 select,12 sync::{mpsc, oneshot, Mutex},13};14use tokio_util::codec::{FramedRead, LinesCodec};15use tracing::{debug, error, warn, Level};1617#[derive(Error, Debug, Clone)]18pub enum Error {19 #[error("failed to create nix repl session: {0}")]20 SessionInit(&'static str),21 #[error("unexpected end of output, nix crashed?")]22 MissingDelimiter,2324 #[error("expression did'nt produce any output")]25 ExpectedOutput,26 #[error("expression produced output, which is unexpected")]27 UnexpectedOutput,2829 #[error("unexpected expression output type")]30 InvalidType,3132 #[error("failed to build attr {attribute}:\n{error}")]33 BuildFailed { attribute: String, error: String },3435 #[error("output: {0}")]36 Json(Arc<serde_json::Error>),37 // int outputs are too specific, and should not be used,38 // thus error is ok to be not informative.39 #[error("int output: {0}")]40 Int(ParseIntError),41 #[error("pool: {0}")]42 Pool(Arc<r2d2::Error>),43 #[error("io: {0}")]44 Io(Arc<std::io::Error>),4546 // TODO: Should be done by wrapper/in different type.47 #[error("at {0}: {1}")]48 InContext(String, Box<Self>),4950 #[error("error: {0}")]51 NixError(String),52}53impl From<r2d2::Error> for Error {54 fn from(value: r2d2::Error) -> Self {55 Self::Pool(Arc::new(value))56 }57}58impl From<std::io::Error> for Error {59 fn from(value: std::io::Error) -> Self {60 Self::Io(Arc::new(value))61 }62}63impl From<serde_json::Error> for Error {64 fn from(value: serde_json::Error) -> Self {65 Self::Json(Arc::new(value))66 }67}68impl Error {69 pub(crate) fn context(self, context: String) -> Self {70 Self::InContext(context, Box::new(self))71 }72}73pub type Result<T, E = Error> = std::result::Result<T, E>;7475enum OutputLine {76 Out(String),77 Err(String),78}79struct OutputHandler {80 rx: mpsc::Receiver<OutputLine>,81 _cancel_handle: oneshot::Receiver<()>,82}83impl OutputHandler {84 fn new(out: ChildStdout, err: ChildStderr) -> Self {85 let mut out = FramedRead::new(out, LinesCodec::new());86 let mut err = FramedRead::new(err, LinesCodec::new());87 let (tx, rx) = mpsc::channel(20);88 let (mut cancelled, _cancel_handle) = oneshot::channel();89 tokio::spawn(async move {90 loop {91 select! {92 // We should receive errors earlier than synchronization93 biased;94 e = err.next() => {95 let Some(Ok(e)) = e else {96 if e.is_some() {97 error!("bad repl stderr: {e:?}");98 }99 continue;100 };101 let _ = tx.send(OutputLine::Err(e)).await;102 }103 o = out.next() => {104 let Some(Ok(o)) = o else {105 if o.is_some() {106 error!("bad repl stdout: {o:?}");107 }108 continue;109 };110 let _ = tx.send(OutputLine::Out(o)).await;111 }112 // Reader doesn't care about stdout, as this is cancelled.113 // Error still might be useful, to process leftover span closures?114 _ = cancelled.closed() => {115 break;116 }117 }118 }119 });120 Self { rx, _cancel_handle }121 }122 async fn next(&mut self) -> Option<OutputLine> {123 self.rx.recv().await124 }125}126127#[must_use]128struct ErrorCollector<'i, H> {129 collected: Vec<String>,130 inner: &'i mut H,131}132impl<'i, H> ErrorCollector<'i, H> {133 fn new(inner: &'i mut H) -> Self {134 Self {135 collected: vec![],136 inner,137 }138 }139}140impl<H> ErrorCollector<'_, H> {141 fn handle_line_inner(&mut self, msg: &str) -> bool {142 let Some(msg) = msg.strip_prefix("@nix ") else {143 return false;144 };145 #[derive(Deserialize)]146 struct ErrorAction {147 action: String,148 level: u32,149 msg: String,150 }151 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {152 return false;153 };154 if act.action != "msg" || act.level != 0 {155 return false;156 }157 self.collected.push(act.msg);158 true159 }160 fn finish(self) -> Result<()> {161 // fn dedent(s: String) -> String {162 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)163 // }164 if !self.collected.is_empty() {165 return Err(Error::NixError(166 self.collected167 .iter()168 .map(|v| {169 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {170 let v = unindent::unindent(f.trim_start());171 v.trim().to_owned()172 } else {173 v.to_owned()174 }175 })176 .join("\n")177 .to_string(),178 ));179 }180 Ok(())181 }182 fn flush(self) {183 for line in self.collected {184 warn!("{line}");185 }186 }187}188impl<H: Handler> Handler for ErrorCollector<'_, H> {189 fn handle_line(&mut self, e: &str) {190 if self.handle_line_inner(e) {191 return;192 }193 self.inner.handle_line(e)194 }195}196197pub struct NixSessionInner {198 full_delimiter: String,199 nix_handler: ClonableHandler<NixHandler>,200 out: OutputHandler,201 stdin: ChildStdin,202 string_wrapping: (String, String),203 number_wrapping: (String, String),204205 executing_command: Arc<Mutex<()>>,206207 next_id: u32,208 pub(crate) free_list: Vec<u32>,209}210211/// Discover inter-message repl delimiter212const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";213/// Discover formatting around strings214const TRAIN_STRING: &str = "\"TRAIN_STRING\"";215/// Discover formatting around numbers216const TRAIN_NUMBER: &str = "13141516";217// Other types of formatting are not discovered, because they are not used, JSON serialization is used instead218// Techically, number training is also not required, because numbers can be converted to string too...219// Eh, I'll remove it later.220221impl NixSessionInner {222 pub(crate) async fn new(223 flake: &OsStr,224 extra_args: impl IntoIterator<Item = &OsStr>,225 ) -> Result<Self> {226 let mut cmd = Command::new("nix");227 cmd.arg("repl")228 .args(["--option", "pure-eval", "true"])229 .arg(flake)230 .arg("--log-format")231 .arg("internal-json");232 for arg in extra_args {233 cmd.arg(arg);234 }235 cmd.stdin(Stdio::piped());236 cmd.stdout(Stdio::piped());237 cmd.stderr(Stdio::piped());238 let cmd = cmd.spawn()?;239 let stdout = cmd.stdout.unwrap();240 let stderr = cmd.stderr.unwrap();241 let mut out = OutputHandler::new(stdout, stderr);242 let mut stdin = cmd.stdin.unwrap();243 // Standard repl hello doesn't work with internal-json logger244 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;245 stdin.write_all(b"\n").await?;246 stdin.flush().await?;247 let nix_handler = NixHandler::default();248 let mut full_delimiter = None;249 let mut errors = vec![];250 while let Some(line) = out.next().await {251 let line = match line {252 OutputLine::Out(o) => o,253 OutputLine::Err(_e) => {254 // Handle startup errors, but skip repl hello?255 errors.push(_e);256 continue;257 }258 };259 if line.contains(REPL_DELIMITER) {260 debug!("discovered repl delimiter with added colors: {line}");261 full_delimiter = Some(line.to_owned());262 break;263 }264 }265 let Some(full_delimiter) = full_delimiter else {266 for e in errors {267 error!("{e}");268 }269 return Err(Error::SessionInit("failed to discover delimiter"));270 };271 let mut res = Self {272 full_delimiter,273 nix_handler: ClonableHandler::new(nix_handler),274 out,275 stdin,276 string_wrapping: Default::default(),277 number_wrapping: Default::default(),278279 executing_command: Arc::new(Mutex::new(())),280281 next_id: 0,282 free_list: vec![],283 };284 res.train().await?;285 Ok(res)286 }287 async fn train(&mut self) -> Result<()> {288 {289 let full_string = self290 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)291 .await?;292 let string_offset = full_string.find(TRAIN_STRING).expect("contained");293 let string_prefix = &full_string[..string_offset];294 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];295 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());296 }297 {298 let full_number = self299 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)300 .await?;301 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");302 let number_prefix = &full_number[..number_offset];303 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];304 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());305 }306 Ok(())307 }308 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {309 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {310 let cmd_str = String::from_utf8_lossy(cmd.as_ref());311 tracing::debug!("{cmd_str}");312 };313 self.stdin.write_all(cmd.as_ref()).await?;314 self.stdin.write_all(b"\n").await?;315 Ok(())316 }317 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {318 let mut out = String::new();319 while let Some(line) = self.out.next().await {320 let line = match line {321 OutputLine::Out(out) => out,322 OutputLine::Err(err) => {323 err_handler.handle_line(&err);324 continue;325 }326 };327 if line == self.full_delimiter {328 return Ok(out);329 }330 if !out.is_empty() {331 out.push('\n');332 }333 out.push_str(&line);334 }335 Err(Error::MissingDelimiter)336 }337 pub(crate) async fn execute_expression_number(338 &mut self,339 expr: impl AsRef<[u8]>,340 ) -> Result<u64> {341 let num = self.number_wrapping.clone();342 let n = self.execute_expression_wrapping(expr, &num).await?;343 n.parse::<u64>().map_err(Error::Int)344 }345 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {346 // builtins.toJSON escapes some thing in incorrect way, e.g escaped "$" in "\${" is being outputed as "\$",347 // while this escape should be removed as it is intended for nix itself, not for json output.348 //349 // This regex only allows \$ in the beginning of the string, it is easier to implement correctly.350 // TODO: Add peg parser for nix-produced JSON?..351 let regex = regex::Regex::new(r#"(?<prefix>[: {,\[]\\")\\\$"#).expect("fixup json");352353 let num = self.string_wrapping.clone();354 let n = self.execute_expression_wrapping(expr, &num).await?;355 let n = regex.replace_all(&n, "$prefix$$");356 let str: String = serde_json::from_str(&n)?;357 Ok(str)358 }359 pub(crate) async fn execute_expression_to_json<V: DeserializeOwned>(360 &mut self,361 expr: impl AsRef<[u8]>,362 ) -> Result<V> {363 let mut fexpr = b"builtins.toJSON (".to_vec();364 fexpr.extend_from_slice(expr.as_ref());365 fexpr.push(b')');366367 Ok(serde_json::from_str(368 &self.execute_expression_string(fexpr).await?,369 )?)370 }371 async fn execute_expression_wrapping(372 &mut self,373 expr: impl AsRef<[u8]>,374 wrapping: &(String, String),375 ) -> Result<String> {376 let mut nix_handler = self.nix_handler.clone();377 let mut collected = ErrorCollector::new(&mut nix_handler);378 let res = self.execute_expression_raw(expr, &mut collected).await?;379 if res.is_empty() {380 collected.finish()?;381 return Err(Error::ExpectedOutput);382 } else {383 collected.flush()384 };385 let Some(res) = res.strip_prefix(&wrapping.0) else {386 return Err(Error::InvalidType);387 };388 let Some(res) = res.strip_suffix(&wrapping.1) else {389 return Err(Error::InvalidType);390 };391 Ok(res.to_owned())392 }393 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {394 let mut nix_handler = self.nix_handler.clone();395 let mut collected = ErrorCollector::new(&mut nix_handler);396 let v = self.execute_expression_raw(expr, &mut collected).await?;397 collected.finish()?;398 if !v.is_empty() {399 return Err(Error::UnexpectedOutput);400 }401 Ok(())402 }403 pub(crate) async fn execute_expression_raw(404 &mut self,405 expr: impl AsRef<[u8]>,406 err_handler: &mut dyn Handler,407 ) -> Result<String> {408 // Prevent two commands from being executed in parallel, messing with each other.409 let _lock = self.executing_command.clone();410 let _guard = _lock.lock().await;411412 self.send_command(expr).await?;413 // It will be echoed414 self.send_command(REPL_DELIMITER).await?;415 self.read_until_delimiter(err_handler).await416 }417 pub(crate) async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {418 let id = self.allocate_id();419 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))420 .await?;421 Ok(id)422 }423424 /// Id should be immediately used425 fn allocate_id(&mut self) -> u32 {426 if let Some(free) = self.free_list.pop() {427 free428 } else {429 let v = self.next_id;430 self.next_id += 1;431 v432 }433 }434 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.435 // async fn free_id(&mut self, id: u32) -> Result<()> {436 // self.execute_expression_empty(format!("sess_field_{id} = null"))437 // .await?;438 // self.free_list.push(id);439 // Ok(())440 // }441}1use std::{ffi::OsStr, num::ParseIntError, process::Stdio, sync::Arc};23use better_command::{ClonableHandler, Handler, NixHandler, NoopHandler};4use futures::StreamExt;5use itertools::Itertools as _;6use serde::{de::DeserializeOwned, Deserialize};7use thiserror::Error;8use tokio::{9 io::AsyncWriteExt,10 process::{ChildStderr, ChildStdin, ChildStdout, Command},11 select,12 sync::{mpsc, oneshot, Mutex},13};14use tokio_util::codec::{FramedRead, LinesCodec};15use tracing::{debug, error, warn, Level};1617#[derive(Error, Debug, Clone)]18pub enum Error {19 #[error("failed to create nix repl session: {0}")]20 SessionInit(&'static str),21 #[error("unexpected end of output, nix crashed?")]22 MissingDelimiter,2324 #[error("expression did'nt produce any output")]25 ExpectedOutput,26 #[error("expression produced output, which is unexpected")]27 UnexpectedOutput,2829 #[error("unexpected expression output type")]30 InvalidType,3132 #[error("failed to build attr {attribute}:\n{error}")]33 BuildFailed { attribute: String, error: String },3435 #[error("output: {0}")]36 Json(Arc<serde_json::Error>),37 // int outputs are too specific, and should not be used,38 // thus error is ok to be not informative.39 #[error("int output: {0}")]40 Int(ParseIntError),41 #[error("pool: {0}")]42 Pool(Arc<r2d2::Error>),43 #[error("io: {0}")]44 Io(Arc<std::io::Error>),4546 // TODO: Should be done by wrapper/in different type.47 #[error("at {0}: {1}")]48 InContext(String, Box<Self>),4950 #[error("error: {0}")]51 NixError(String),52}53impl From<r2d2::Error> for Error {54 fn from(value: r2d2::Error) -> Self {55 Self::Pool(Arc::new(value))56 }57}58impl From<std::io::Error> for Error {59 fn from(value: std::io::Error) -> Self {60 Self::Io(Arc::new(value))61 }62}63impl From<serde_json::Error> for Error {64 fn from(value: serde_json::Error) -> Self {65 Self::Json(Arc::new(value))66 }67}68impl Error {69 pub(crate) fn context(self, context: String) -> Self {70 Self::InContext(context, Box::new(self))71 }72}73pub type Result<T, E = Error> = std::result::Result<T, E>;7475enum OutputLine {76 Out(String),77 Err(String),78}79struct OutputHandler {80 rx: mpsc::Receiver<OutputLine>,81 _cancel_handle: oneshot::Receiver<()>,82}83impl OutputHandler {84 fn new(out: ChildStdout, err: ChildStderr) -> Self {85 let mut out = FramedRead::new(out, LinesCodec::new());86 let mut err = FramedRead::new(err, LinesCodec::new());87 let (tx, rx) = mpsc::channel(20);88 let (mut cancelled, _cancel_handle) = oneshot::channel();89 tokio::spawn(async move {90 loop {91 select! {92 // We should receive errors earlier than synchronization93 biased;94 e = err.next() => {95 let Some(Ok(e)) = e else {96 if e.is_some() {97 error!("bad repl stderr: {e:?}");98 }99 continue;100 };101 let _ = tx.send(OutputLine::Err(e)).await;102 }103 o = out.next() => {104 let Some(Ok(o)) = o else {105 if o.is_some() {106 error!("bad repl stdout: {o:?}");107 }108 continue;109 };110 let _ = tx.send(OutputLine::Out(o)).await;111 }112 // Reader doesn't care about stdout, as this is cancelled.113 // Error still might be useful, to process leftover span closures?114 _ = cancelled.closed() => {115 break;116 }117 }118 }119 });120 Self { rx, _cancel_handle }121 }122 async fn next(&mut self) -> Option<OutputLine> {123 self.rx.recv().await124 }125}126127#[must_use]128struct ErrorCollector<'i, H> {129 collected: Vec<String>,130 inner: &'i mut H,131}132impl<'i, H> ErrorCollector<'i, H> {133 fn new(inner: &'i mut H) -> Self {134 Self {135 collected: vec![],136 inner,137 }138 }139}140impl<H> ErrorCollector<'_, H> {141 fn handle_line_inner(&mut self, msg: &str) -> bool {142 let Some(msg) = msg.strip_prefix("@nix ") else {143 return false;144 };145 #[derive(Deserialize)]146 struct ErrorAction {147 action: String,148 level: u32,149 msg: String,150 }151 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {152 return false;153 };154 if act.action != "msg" || act.level != 0 {155 return false;156 }157 self.collected.push(act.msg);158 true159 }160 fn finish(self) -> Result<()> {161 // fn dedent(s: String) -> String {162 // s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)163 // }164 if !self.collected.is_empty() {165 return Err(Error::NixError(166 self.collected167 .iter()168 .map(|v| {169 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {170 let v = unindent::unindent(f.trim_start());171 v.trim().to_owned()172 } else {173 v.to_owned()174 }175 })176 .join("\n")177 .to_string(),178 ));179 }180 Ok(())181 }182 fn flush(self) {183 for line in self.collected {184 warn!("{line}");185 }186 }187}188impl<H: Handler> Handler for ErrorCollector<'_, H> {189 fn handle_line(&mut self, e: &str) {190 if self.handle_line_inner(e) {191 return;192 }193 self.inner.handle_line(e)194 }195}196197pub struct NixSessionInner {198 full_delimiter: String,199 nix_handler: ClonableHandler<NixHandler>,200 out: OutputHandler,201 stdin: ChildStdin,202 string_wrapping: (String, String),203 number_wrapping: (String, String),204205 executing_command: Arc<Mutex<()>>,206207 next_id: u32,208 pub(crate) free_list: Vec<u32>,209210 pub nix_system: String,211}212213/// Discover inter-message repl delimiter214const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";215/// Discover formatting around strings216const TRAIN_STRING: &str = "\"TRAIN_STRING\"";217/// Discover formatting around numbers218const TRAIN_NUMBER: &str = "13141516";219// Other types of formatting are not discovered, because they are not used, JSON serialization is used instead220// Techically, number training is also not required, because numbers can be converted to string too...221// Eh, I'll remove it later.222223impl NixSessionInner {224 pub(crate) async fn new(225 flake: &OsStr,226 extra_args: impl IntoIterator<Item = &OsStr>,227 nix_system: String,228 ) -> Result<Self> {229 let mut cmd = Command::new("nix");230 cmd.arg("repl")231 .args(["--option", "pure-eval", "true"])232 .arg(flake)233 .arg("--log-format")234 .arg("internal-json");235 for arg in extra_args {236 cmd.arg(arg);237 }238 cmd.stdin(Stdio::piped());239 cmd.stdout(Stdio::piped());240 cmd.stderr(Stdio::piped());241 let cmd = cmd.spawn()?;242 let stdout = cmd.stdout.unwrap();243 let stderr = cmd.stderr.unwrap();244 let mut out = OutputHandler::new(stdout, stderr);245 let mut stdin = cmd.stdin.unwrap();246 // Standard repl hello doesn't work with internal-json logger247 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;248 stdin.write_all(b"\n").await?;249 stdin.flush().await?;250 let nix_handler = NixHandler::default();251 let mut full_delimiter = None;252 let mut errors = vec![];253 while let Some(line) = out.next().await {254 let line = match line {255 OutputLine::Out(o) => o,256 OutputLine::Err(_e) => {257 // Handle startup errors, but skip repl hello?258 errors.push(_e);259 continue;260 }261 };262 if line.contains(REPL_DELIMITER) {263 debug!("discovered repl delimiter with added colors: {line}");264 full_delimiter = Some(line.to_owned());265 break;266 }267 }268 let Some(full_delimiter) = full_delimiter else {269 for e in errors {270 error!("{e}");271 }272 return Err(Error::SessionInit("failed to discover delimiter"));273 };274 let mut res = Self {275 full_delimiter,276 nix_handler: ClonableHandler::new(nix_handler),277 out,278 stdin,279 string_wrapping: Default::default(),280 number_wrapping: Default::default(),281282 executing_command: Arc::new(Mutex::new(())),283284 next_id: 0,285 free_list: vec![],286287 nix_system,288 };289 res.train().await?;290 Ok(res)291 }292 async fn train(&mut self) -> Result<()> {293 {294 let full_string = self295 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)296 .await?;297 let string_offset = full_string.find(TRAIN_STRING).expect("contained");298 let string_prefix = &full_string[..string_offset];299 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];300 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());301 }302 {303 let full_number = self304 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)305 .await?;306 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");307 let number_prefix = &full_number[..number_offset];308 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];309 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());310 }311 Ok(())312 }313 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {314 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {315 let cmd_str = String::from_utf8_lossy(cmd.as_ref());316 tracing::debug!("{cmd_str}");317 };318 self.stdin.write_all(cmd.as_ref()).await?;319 self.stdin.write_all(b"\n").await?;320 Ok(())321 }322 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {323 let mut out = String::new();324 while let Some(line) = self.out.next().await {325 let line = match line {326 OutputLine::Out(out) => out,327 OutputLine::Err(err) => {328 err_handler.handle_line(&err);329 continue;330 }331 };332 if line == self.full_delimiter {333 return Ok(out);334 }335 if !out.is_empty() {336 out.push('\n');337 }338 out.push_str(&line);339 }340 Err(Error::MissingDelimiter)341 }342 pub(crate) async fn execute_expression_number(343 &mut self,344 expr: impl AsRef<[u8]>,345 ) -> Result<u64> {346 let num = self.number_wrapping.clone();347 let n = self.execute_expression_wrapping(expr, &num).await?;348 n.parse::<u64>().map_err(Error::Int)349 }350 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {351 // builtins.toJSON escapes some thing in incorrect way, e.g escaped "$" in "\${" is being outputed as "\$",352 // while this escape should be removed as it is intended for nix itself, not for json output.353 //354 // This regex only allows \$ in the beginning of the string, it is easier to implement correctly.355 // TODO: Add peg parser for nix-produced JSON?..356 let regex = regex::Regex::new(r#"(?<prefix>[: {,\[]\\")\\\$"#).expect("fixup json");357358 let num = self.string_wrapping.clone();359 let n = self.execute_expression_wrapping(expr, &num).await?;360 let n = regex.replace_all(&n, "$prefix$$");361 let str: String = serde_json::from_str(&n)?;362 Ok(str)363 }364 pub(crate) async fn execute_expression_to_json<V: DeserializeOwned>(365 &mut self,366 expr: impl AsRef<[u8]>,367 ) -> Result<V> {368 let mut fexpr = b"builtins.toJSON (".to_vec();369 fexpr.extend_from_slice(expr.as_ref());370 fexpr.push(b')');371372 Ok(serde_json::from_str(373 &self.execute_expression_string(fexpr).await?,374 )?)375 }376 async fn execute_expression_wrapping(377 &mut self,378 expr: impl AsRef<[u8]>,379 wrapping: &(String, String),380 ) -> Result<String> {381 let mut nix_handler = self.nix_handler.clone();382 let mut collected = ErrorCollector::new(&mut nix_handler);383 let res = self.execute_expression_raw(expr, &mut collected).await?;384 if res.is_empty() {385 collected.finish()?;386 return Err(Error::ExpectedOutput);387 } else {388 collected.flush()389 };390 let Some(res) = res.strip_prefix(&wrapping.0) else {391 return Err(Error::InvalidType);392 };393 let Some(res) = res.strip_suffix(&wrapping.1) else {394 return Err(Error::InvalidType);395 };396 Ok(res.to_owned())397 }398 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {399 let mut nix_handler = self.nix_handler.clone();400 let mut collected = ErrorCollector::new(&mut nix_handler);401 let v = self.execute_expression_raw(expr, &mut collected).await?;402 collected.finish()?;403 if !v.is_empty() {404 return Err(Error::UnexpectedOutput);405 }406 Ok(())407 }408 pub(crate) async fn execute_expression_raw(409 &mut self,410 expr: impl AsRef<[u8]>,411 err_handler: &mut dyn Handler,412 ) -> Result<String> {413 // Prevent two commands from being executed in parallel, messing with each other.414 let _lock = self.executing_command.clone();415 let _guard = _lock.lock().await;416417 self.send_command(expr).await?;418 // It will be echoed419 self.send_command(REPL_DELIMITER).await?;420 self.read_until_delimiter(err_handler).await421 }422 pub(crate) async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {423 let id = self.allocate_id();424 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))425 .await?;426 Ok(id)427 }428429 /// Id should be immediately used430 fn allocate_id(&mut self) -> u32 {431 if let Some(free) = self.free_list.pop() {432 free433 } else {434 let v = self.next_id;435 self.next_id += 1;436 v437 }438 }439 // Nix has no way to deallocate variable, yet GC will correct everything not reachable.440 // async fn free_id(&mut self, id: u32) -> Result<()> {441 // self.execute_expression_empty(format!("sess_field_{id} = null"))442 // .await?;443 // self.free_list.push(id);444 // Ok(())445 // }446}