1234use std::collections::HashMap;5use std::ffi::{OsStr, OsString};6use std::fmt::{self, Display};7use std::path::PathBuf;8use std::process::Stdio;9use std::sync::{Arc, OnceLock};1011use anyhow::{anyhow, bail, ensure, Context, Result};12use better_command::{ClonableHandler, Handler, NixHandler, NoopHandler};13use futures::StreamExt;14use itertools::Itertools;15use r2d2::{Pool, PooledConnection};16use serde::de::DeserializeOwned;17use serde::{Deserialize, Serialize};18use tokio::io::AsyncWriteExt;19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};20use tokio::select;21use tokio::sync::{mpsc, oneshot, Mutex};22use tokio_util::codec::{FramedRead, LinesCodec};23use tracing::{debug, error, warn, Level};2425const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2627pub struct NixSessionInner {28 full_delimiter: String,29 nix_handler: ClonableHandler<NixHandler>,30 out: OutputHandler,31 stdin: ChildStdin,32 string_wrapping: (String, String),33 number_wrapping: (String, String),3435 executing_command: Arc<Mutex<()>>,3637 next_id: u32,38 free_list: Vec<u32>,39}40const TRAIN_STRING: &str = "\"TRAIN_STRING\"";41const TRAIN_NUMBER: &str = "13141516";4243#[must_use]44struct ErrorCollector<'i, H> {45 collected: Vec<String>,46 inner: &'i mut H,47}48impl<'i, H> ErrorCollector<'i, H> {49 fn new(inner: &'i mut H) -> Self {50 Self {51 collected: vec![],52 inner,53 }54 }55}56impl<H> ErrorCollector<'_, H> {57 fn handle_line_inner(&mut self, msg: &str) -> bool {58 let Some(msg) = msg.strip_prefix("@nix ") else {59 return false;60 };61 #[derive(Deserialize)]62 struct ErrorAction {63 action: String,64 level: u32,65 msg: String,66 }67 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {68 return false;69 };70 if act.action != "msg" || act.level != 0 {71 return false;72 }73 self.collected.push(act.msg);74 true75 }76 fn finish(self) -> Result<()> {77 78 79 80 if !self.collected.is_empty() {81 bail!(82 "{}",83 self.collected84 .iter()85 .map(|v| {86 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {87 let v = unindent::unindent(f.trim_start());88 v.trim().to_owned()89 } else {90 v.to_owned()91 }92 })93 .join("\n")94 );95 }96 Ok(())97 }98 fn flush(self) {99 for line in self.collected {100 warn!("{line}");101 }102 }103}104impl<H: Handler> Handler for ErrorCollector<'_, H> {105 fn handle_line(&mut self, e: &str) {106 if self.handle_line_inner(e) {107 return;108 }109 self.inner.handle_line(e)110 }111}112113enum OutputLine {114 Out(String),115 Err(String),116}117struct OutputHandler {118 rx: mpsc::Receiver<OutputLine>,119 _cancel_handle: oneshot::Receiver<()>,120}121impl OutputHandler {122 fn new(out: ChildStdout, err: ChildStderr) -> Self {123 let mut out = FramedRead::new(out, LinesCodec::new());124 let mut err = FramedRead::new(err, LinesCodec::new());125 let (tx, rx) = mpsc::channel(20);126 let (mut cancelled, _cancel_handle) = oneshot::channel();127 tokio::spawn(async move {128 loop {129 select! {130 131 biased;132 e = err.next() => {133 let Some(Ok(e)) = e else {134 if e.is_some() {135 error!("bad repl stderr: {e:?}");136 }137 continue;138 };139 let _ = tx.send(OutputLine::Err(e)).await;140 }141 o = out.next() => {142 let Some(Ok(o)) = o else {143 if o.is_some() {144 error!("bad repl stdout: {o:?}");145 }146 continue;147 };148 let _ = tx.send(OutputLine::Out(o)).await;149 }150 151 152 _ = cancelled.closed() => {153 break;154 }155 }156 }157 });158 Self { rx, _cancel_handle }159 }160 async fn next(&mut self) -> Option<OutputLine> {161 self.rx.recv().await162 }163}164165struct WarnHandler;166impl Handler for WarnHandler {167 fn handle_line(&mut self, e: &str) {168 warn!(target: "nix", "{e}")169 }170}171172impl NixSessionInner {173 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {174 let mut cmd = Command::new("nix");175 cmd.arg("repl")176 .arg(flake)177 .arg("--log-format")178 .arg("internal-json");179 for arg in extra_args {180 cmd.arg(arg);181 }182 cmd.stdin(Stdio::piped());183 cmd.stdout(Stdio::piped());184 cmd.stderr(Stdio::piped());185 let cmd = cmd.spawn()?;186 let stdout = cmd.stdout.unwrap();187 let stderr = cmd.stderr.unwrap();188 let mut out = OutputHandler::new(stdout, stderr);189 let mut stdin = cmd.stdin.unwrap();190 191 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;192 stdin.write_all(b"\n").await?;193 stdin.flush().await?;194 let nix_handler = NixHandler::default();195 let mut full_delimiter = None;196 let mut errors = vec![];197 while let Some(line) = out.next().await {198 let line = match line {199 OutputLine::Out(o) => o,200 OutputLine::Err(_e) => {201 202 errors.push(_e);203 continue;204 }205 };206 if line.contains(REPL_DELIMITER) {207 debug!("discovered repl delimiter with added colors: {line}");208 full_delimiter = Some(line.to_owned());209 break;210 }211 }212 let Some(full_delimiter) = full_delimiter else {213 for e in errors {214 error!("{e}");215 }216 bail!("failed to discover delimiter");217 };218 let mut res = Self {219 full_delimiter,220 nix_handler: ClonableHandler::new(nix_handler),221 out,222 stdin,223 string_wrapping: Default::default(),224 number_wrapping: Default::default(),225226 executing_command: Arc::new(Mutex::new(())),227228 next_id: 0,229 free_list: vec![],230 };231 res.train().await?;232 Ok(res)233 }234 async fn train(&mut self) -> Result<()> {235 {236 let full_string = self237 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)238 .await?;239 let string_offset = full_string.find(TRAIN_STRING).expect("contained");240 let string_prefix = &full_string[..string_offset];241 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];242 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());243 }244 {245 let full_number = self246 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)247 .await?;248 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");249 let number_prefix = &full_number[..number_offset];250 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];251 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());252 }253 Ok(())254 }255 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {256 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {257 let cmd_str = String::from_utf8_lossy(cmd.as_ref());258 tracing::debug!("{cmd_str}");259 };260 self.stdin.write_all(cmd.as_ref()).await?;261 self.stdin.write_all(b"\n").await?;262 Ok(())263 }264 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {265 let mut out = String::new();266 while let Some(line) = self.out.next().await {267 let line = match line {268 OutputLine::Out(out) => out,269 OutputLine::Err(err) => {270 err_handler.handle_line(&err);271 continue;272 }273 };274 if line == self.full_delimiter {275 return Ok(out);276 }277 if !out.is_empty() {278 out.push('\n');279 }280 out.push_str(&line);281 }282 bail!("didn't reached delimiter");283 }284 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {285 let num = self.number_wrapping.clone();286 let n = self.execute_expression_wrapping(expr, &num).await?;287 Ok(n.parse::<u64>()?)288 }289 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {290 let num = self.string_wrapping.clone();291 let n = self.execute_expression_wrapping(expr, &num).await?;292 let str: String = serde_json::from_str(&n)?;293 Ok(str)294 }295 async fn execute_expression_to_json<V: DeserializeOwned>(296 &mut self,297 expr: impl AsRef<[u8]>,298 ) -> Result<V> {299 let mut fexpr = b"builtins.toJSON (".to_vec();300 fexpr.extend_from_slice(expr.as_ref());301 fexpr.push(b')');302 let v = self303 .execute_expression_string(fexpr)304 .await305 .context("string expression")?;306 serde_json::from_str(&v).context("json parse")307 }308 async fn execute_expression_wrapping(309 &mut self,310 expr: impl AsRef<[u8]>,311 wrapping: &(String, String),312 ) -> Result<String> {313 let mut nix_handler = self.nix_handler.clone();314 let mut collected = ErrorCollector::new(&mut nix_handler);315 let res = self.execute_expression_raw(expr, &mut collected).await?;316 if res.is_empty() {317 collected.finish()?;318 bail!("expected expression, got nothing")319 } else {320 collected.flush()321 };322 let Some(res) = res.strip_prefix(&wrapping.0) else {323 bail!("invalid type")324 };325 let Some(res) = res.strip_suffix(&wrapping.1) else {326 bail!("invalid type")327 };328 Ok(res.to_owned())329 }330 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {331 let mut nix_handler = self.nix_handler.clone();332 let mut collected = ErrorCollector::new(&mut nix_handler);333 let v = self.execute_expression_raw(expr, &mut collected).await?;334 collected.finish()?;335 ensure!(v.is_empty(), "unexpected expression result");336 Ok(())337 }338 async fn execute_expression_raw(339 &mut self,340 expr: impl AsRef<[u8]>,341 err_handler: &mut dyn Handler,342 ) -> Result<String> {343 344 let _lock = self.executing_command.clone();345 let _guard = _lock.lock().await;346347 self.send_command(expr).await?;348 349 self.send_command(REPL_DELIMITER).await?;350 self.read_until_delimiter(err_handler).await351 }352 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {353 let id = self.allocate_id();354 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))355 .await?;356 Ok(id)357 }358359 360 fn allocate_id(&mut self) -> u32 {361 if let Some(free) = self.free_list.pop() {362 free363 } else {364 let v = self.next_id;365 self.next_id += 1;366 v367 }368 }369 370 371 372 373 374 375 376}377378#[derive(Clone)]379pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);380381#[derive(Clone)]382pub struct NixExprBuilder {383 out: String,384 used_fields: Vec<Field>,385}386impl NixExprBuilder {387 pub fn object() -> Self {388 NixExprBuilder {389 out: "{ ".to_owned(),390 used_fields: Vec::new(),391 }392 }393 pub fn string(s: &str) -> Self {394 NixExprBuilder {395 out: nixlike::serialize(s)396 .expect("no problems with serializing_string")397 .trim_end()398 .to_owned(),399 used_fields: Vec::new(),400 }401 }402 pub fn serialized(v: impl Serialize) -> Self {403 let serialized = nixlike::serialize(v).expect("invalid value for apply");404 Self {405 out: serialized.trim_end().to_owned(),406 used_fields: Vec::new(),407 }408 }409 pub fn field(f: Field) -> Self {410 Self {411 out: format!("sess_field_{}", f.0.value.expect("no value")),412 used_fields: vec![f],413 }414 }415 pub fn end_obj(&mut self) {416 self.out.push('}');417 }418 pub fn obj_key(&mut self, name: Self, value: Self) {419 self.out.push_str(r#""${"#);420 self.extend(name);421 self.out.push_str(r#"}" = "#);422 self.extend(value);423 self.out.push_str("; ");424 }425426 pub fn extend(&mut self, e: Self) {427 self.out.push_str(&e.out);428 self.used_fields.extend(e.used_fields);429 }430431 #[allow(dead_code)]432 pub fn session(&self) -> NixSession {433 let mut session = None;434 for ele in &self.used_fields {435 if session.is_none() {436 session = Some(ele.0.session.clone());437 continue;438 }439 let session = &session.as_ref().expect("checked").0;440 let ele_sess = &ele.0.session.0;441 assert!(442 Arc::ptr_eq(session, ele_sess),443 "can't mix fields from different session"444 );445 }446 session.expect("expr without fields used")447 }448 #[allow(dead_code)]449 pub fn index_attr(&mut self, s: &str) {450 let escaped = nixlike::serialize(s).expect("string");451 self.out.push('.');452 self.out.push_str(escaped.trim_end());453 }454}455456#[macro_export]457macro_rules! nix_expr_inner {458 459 (@obj($o:ident) $field:ident, $($tt:tt)*) => {{460 $o.obj_key(461 NixExprBuilder::string(stringify!($field)),462 NixExprBuilder::field($field),463 );464 nix_expr_inner!(@obj($o) $($tt)*);465 }};466 (@obj($o:ident) $field:ident: $v:block, $($tt:tt)*) => {{467 $o.obj_key(468 NixExprBuilder::string(stringify!($field)),469 NixExprBuilder::serialized(&$v),470 );471 nix_expr_inner!(@obj($o) $($tt)*);472 }};473 (@obj($o:ident)) => {{}};474 (Obj { $($tt:tt)* }) => {{475 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};476 let mut out = NixExprBuilder::object();477 nix_expr_inner!(@obj(out) $($tt)*);478 out.end_obj();479 out480 }};481 (@field($o:ident) . $var:ident $($tt:tt)*) => {{482 $o.index_attr(stringify!($var));483 nix_expr_inner!(@field($o) $($tt)*);484 }};485 (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{486 $o.push(Index::attr(&$v));487 nix_expr_inner!(@o($o) $($tt)*);488 }};489 (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{490 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));491 nix_expr_inner!(@o($o) $($tt)*);492 }};493 (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {494 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));495 nix_expr_inner!(@o($o) $($tt)*);496 };497 (@field($o:ident)) => {};498 ($field:ident $($tt:tt)*) => {{499 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};500 #[allow(unused_mut, reason = "might be used if indexed")]501 let mut out = NixExprBuilder::field($field.clone());502 nix_expr_inner!(@field(out) $($tt)*);503 out504 }};505 ($v:literal) => {{506 use $crate::better_nix_eval::NixExprBuilder;507 NixExprBuilder::string($v)508 }};509 ({$v:expr}) => {{510 use $crate::better_nix_eval::NixExprBuilder;511 NixExprBuilder::serialized(&$v)512 }}513}514#[macro_export]515macro_rules! nix_expr {516 ($($tt:tt)+) => {{517 use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};518 let expr = nix_expr_inner!($($tt)+);519 Field::new(expr.session(), expr.out)520 }};521}522523#[macro_export]524macro_rules! nix_go {525 (@o($o:ident) . $var:ident $($tt:tt)*) => {{526 $o.push(Index::attr(stringify!($var)));527 nix_go!(@o($o) $($tt)*);528 }};529 (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{530 $o.push(Index::attr(&$v));531 nix_go!(@o($o) $($tt)*);532 }};533 (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{534 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));535 nix_go!(@o($o) $($tt)*);536 }};537 (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {538 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));539 nix_go!(@o($o) $($tt)*);540 };541 (@o($o:ident) | $($var:tt)*) => {542 $o.push(Index::Pipe($crate::nix_expr_inner!($($var)+)));543 };544 (@o($o:ident)) => {};545 ($field:ident $($tt:tt)+) => {{546 use $crate::{nix_go, better_nix_eval::Index};547 let field = $field.clone();548 let mut out = vec![];549 nix_go!(@o(out) $($tt)*);550 field.select(out).await?551 }}552}553#[macro_export]554macro_rules! nix_go_json {555 ($($tt:tt)*) => {{556 $crate::nix_go!($($tt)*).as_json().await?557 }};558}559560#[derive(Clone)]561pub enum Index {562 Var(String),563 String(String),564 #[allow(dead_code)]565 Apply(String),566 #[allow(dead_code)]567 Expr(NixExprBuilder),568 ExprApply(NixExprBuilder),569 Pipe(NixExprBuilder),570}571impl Index {572 pub fn var(v: impl AsRef<str>) -> Self {573 let v = v.as_ref();574 assert!(575 !(v.contains('.') | v.contains(' ')),576 "bad variable name: {v}"577 );578 Self::Var(v.to_owned())579 }580 pub fn attr(v: impl AsRef<str>) -> Self {581 Self::String(v.as_ref().to_owned())582 }583 #[allow(dead_code)]584 pub fn apply(v: impl Serialize) -> Self {585 let serialized = nixlike::serialize(v).expect("invalid value for apply");586 Self::Apply(serialized.trim_end().to_owned())587 }588}589impl Display for Index {590 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {591 match self {592 Index::Var(v) => {593 write!(f, "{v}")594 }595 Index::String(k) => {596 let v = nixlike::format_identifier(k.as_str());597 write!(f, ".{v}")598 }599 Index::Apply(o) => {600 write!(f, "<apply>({o})")601 }602 Index::Expr(e) => {603 write!(f, "[{}]", e.out)604 }605 Index::ExprApply(e) => {606 write!(f, "<apply>({})", e.out)607 }608 Index::Pipe(e) => {609 write!(f, "<map>({})", e.out)610 }611 }612 }613}614impl fmt::Debug for Index {615 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {616 write!(f, "{self}")617 }618}619struct PathDisplay<'i>(&'i [Index]);620impl Display for PathDisplay<'_> {621 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {622 for i in self.0 {623 write!(f, "{i}")?;624 }625 Ok(())626 }627}628struct FieldInner {629 full_path: Option<Vec<Index>>,630 session: NixSession,631 value: Option<u32>,632}633fn context(op: &str, full_path: Option<&[Index]>, query: &str) -> String {634 if let Some(full_path) = &full_path {635 format!("on {op}, full path: {}", PathDisplay(full_path))636 } else {637 format!("query: {query:?}")638 }639}640#[derive(Clone)]641pub struct Field(Arc<FieldInner>);642impl Field {643 fn root(session: NixSession) -> Self {644 Self(Arc::new(FieldInner {645 full_path: Some(vec![]),646 session,647 value: None,648 }))649 }650 async fn new(session: NixSession, query: &str) -> Result<Self> {651 let vid = session652 .0653 .lock()654 .await655 .execute_assign(query)656 .await657 .with_context(|| context("new root", None, query))?;658 Ok(Self(Arc::new(FieldInner {659 full_path: None,660 session,661 value: Some(vid),662 })))663 }664 pub async fn field(session: NixSession, field: &str) -> Result<Self> {665 Self::root(session).select([Index::var(field)]).await666 }667 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {668 let mut used_fields = Vec::new();669 let mut name = name.into_iter();670671 let mut full_path = self.0.full_path.clone();672 let mut query = if let Some(id) = self.0.value {673 format!("sess_field_{id}")674 } else {675 let first = name.next();676 if let Some(Index::Var(i)) = first {677 if let Some(full_path) = &mut full_path {678 full_path.push(Index::Var(i.clone()));679 }680 i.clone()681 } else {682 panic!("first path item should be variable, got {first:?}")683 }684 };685 for v in name {686 if let Some(full_path) = &mut full_path {687 full_path.push(v.clone());688 }689 match v {690 Index::Var(_) => panic!("var item may only be first"),691 Index::String(s) => {692 let escaped = nixlike::serialize(s)?;693 query.push('.');694 query.push_str(escaped.trim());695 }696 Index::Apply(a) => {697 698 query = format!("({query} {a})");699 }700 Index::Expr(e) => {701 let index = Field::new(self.0.session.clone(), &e.out).await?;702 used_fields.push(index.clone());703 query.push('.');704 let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));705 query.push_str(&index);706 }707 Index::ExprApply(e) => {708 let index = Field::new(self.0.session.clone(), &e.out).await?;709 used_fields.push(index.clone());710 query.push(' ');711 let index = format!("sess_field_{}", index.0.value.expect("value"));712 query.push_str(&index);713 query = format!("({query})");714 }715 Index::Pipe(v) => {716 let index = Field::new(self.0.session.clone(), &v.out).await?;717 used_fields.push(index.clone());718 let index = format!("sess_field_{}", index.0.value.expect("value"));719 query = format!("({index} {query})");720 }721 }722 }723724 let vid = self725 .0726 .session727 .0728 .lock()729 .await730 .execute_assign(&query)731 .await732 .with_context(|| {733 if let Some(full_path) = &full_path {734 format!("full path: {}", PathDisplay(full_path))735 } else {736 format!("query: {query:?}")737 }738 })?;739 Ok(Self(Arc::new(FieldInner {740 full_path,741 session: self.0.session.clone(),742 value: Some(vid),743 })))744 }745 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {746 let id = self.0.value.expect("can't serialize root field");747 let query = format!("sess_field_{id}");748 self.0749 .session750 .0751 .lock()752 .await753 .execute_expression_to_json(&query)754 .await755 .with_context(|| context("as_json", self.0.full_path.as_deref(), &query))756 }757 #[allow(dead_code)]758 pub async fn has_field(&self, name: &str) -> Result<bool> {759 let id = self.0.value.expect("can't list root fields");760 let key = nixlike::escape_string(name);761 let query = format!("sess_field_{id} ? {key}");762 self.0763 .session764 .0765 .lock()766 .await767 .execute_expression_to_json(&query)768 .await769 .with_context(|| context("has_field", self.0.full_path.as_deref(), &query))770 }771 pub async fn list_fields(&self) -> Result<Vec<String>> {772 let id = self.0.value.expect("can't list root fields");773 let query = format!("builtins.attrNames sess_field_{id}");774 self.0775 .session776 .0777 .lock()778 .await779 .execute_expression_to_json(&query)780 .await781 .with_context(|| context("list field", self.0.full_path.as_deref(), &query))782 }783 pub async fn type_of(&self) -> Result<String> {784 let id = self.0.value.expect("can't list root fields");785 let query = format!("builtins.typeOf sess_field_{id}");786 self.0787 .session788 .0789 .lock()790 .await791 .execute_expression_to_json(&query)792 .await793 .with_context(|| context("type_of", self.0.full_path.as_deref(), &query))794 }795 #[allow(dead_code)]796 pub async fn import(&self) -> Result<Self> {797 let import = Self::new(self.0.session.clone(), "import").await?;798 Ok(nix_go!(self | import))799 }800 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {801 let id = self.0.value.expect("can't use build on not-value");802 let query = format!(":b sess_field_{id}");803 let vid = self804 .0805 .session806 .0807 .lock()808 .await809 .execute_expression_raw(&query, &mut NixHandler::default())810 .await?;811 ensure!(812 !vid.is_empty(),813 "build failed: {}",814 context("build", self.0.full_path.as_deref(), &query),815 );816 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")817 else {818 panic!("unexpected build output: {vid:?}");819 };820 let outputs = vid821 .split('\n')822 .filter(|v| !v.is_empty())823 .map(|v| v.split_once(" -> ").expect("unexpected build output"))824 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))825 .collect();826 Ok(outputs)827 }828}829impl Drop for FieldInner {830 fn drop(&mut self) {831 if let Some(id) = self.value {832 if let Ok(mut lock) = self.session.0.try_lock() {833 lock.free_list.push(id)834 }835 836 }837 }838}839struct NixSessionPoolInner {840 flake: OsString,841 nix_args: Vec<OsString>,842}843844#[derive(Debug)]845pub struct NixPoolError(anyhow::Error);846impl From<anyhow::Error> for NixPoolError {847 fn from(value: anyhow::Error) -> Self {848 Self(value)849 }850}851impl std::error::Error for NixPoolError {}852impl std::fmt::Display for NixPoolError {853 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {854 self.0.fmt(f)855 }856}857impl r2d2::ManageConnection for NixSessionPoolInner {858 type Connection = NixSessionInner;859 type Error = NixPoolError;860 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {861 let _v = TOKIO_RUNTIME862 .get()863 .expect("missed tokio runtime init!")864 .enter();865 Ok(futures::executor::block_on(NixSessionInner::new(866 self.flake.as_os_str(),867 self.nix_args.iter().map(OsString::as_os_str),868 ))?)869 }870871 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {872 let _v = TOKIO_RUNTIME873 .get()874 .expect("missed tokio runtime init!")875 .enter();876 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;877 if res != 4 {878 return Err(anyhow!("sanity check failed").into());879 };880 Ok(())881 }882883 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {884 false885 }886}887pub struct NixSessionPool(Pool<NixSessionPoolInner>);888impl NixSessionPool {889 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {890 let inner = tokio::task::block_in_place(|| {891 r2d2::Builder::<NixSessionPoolInner>::new()892 .min_idle(Some(0))893 .build(NixSessionPoolInner { flake, nix_args })894 })?;895 Ok(Self(inner))896 }897 pub async fn get(&self) -> Result<NixSession> {898 let v = tokio::task::block_in_place(|| self.0.get())?;899 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))900 }901}902903pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();