1234use std::collections::HashMap;5use std::ffi::{OsStr, OsString};6use std::fmt::{self, Display};7use std::path::PathBuf;8use std::process::Stdio;9use std::sync::{Arc, OnceLock};1011use anyhow::{anyhow, bail, ensure, Context, Result};12use better_command::{ClonableHandler, Handler, NixHandler, NoopHandler};13use futures::StreamExt;14use itertools::Itertools;15use r2d2::{Pool, PooledConnection};16use serde::de::DeserializeOwned;17use serde::{Deserialize, Serialize};18use tokio::io::AsyncWriteExt;19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};20use tokio::select;21use tokio::sync::{mpsc, oneshot, Mutex};22use tokio_util::codec::{FramedRead, LinesCodec};23use tracing::{debug, error, warn, Level};2425const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2627pub struct NixSessionInner {28 full_delimiter: String,29 nix_handler: ClonableHandler<NixHandler>,30 out: OutputHandler,31 stdin: ChildStdin,32 string_wrapping: (String, String),33 number_wrapping: (String, String),3435 executing_command: Arc<Mutex<()>>,3637 next_id: u32,38 free_list: Vec<u32>,39}40const TRAIN_STRING: &str = "\"TRAIN_STRING\"";41const TRAIN_NUMBER: &str = "13141516";4243#[must_use]44struct ErrorCollector<'i, H> {45 collected: Vec<String>,46 inner: &'i mut H,47}48impl<'i, H> ErrorCollector<'i, H> {49 fn new(inner: &'i mut H) -> Self {50 Self {51 collected: vec![],52 inner,53 }54 }55}56impl<H> ErrorCollector<'_, H> {57 fn handle_line_inner(&mut self, msg: &str) -> bool {58 let Some(msg) = msg.strip_prefix("@nix ") else {59 return false;60 };61 #[derive(Deserialize)]62 struct ErrorAction {63 action: String,64 level: u32,65 msg: String,66 }67 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {68 return false;69 };70 if act.action != "msg" || act.level != 0 {71 return false;72 }73 self.collected.push(act.msg);74 true75 }76 fn finish(self) -> Result<()> {77 78 79 80 if !self.collected.is_empty() {81 bail!(82 "{}",83 self.collected84 .iter()85 .map(|v| {86 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {87 let v = unindent::unindent(f.trim_start());88 v.trim().to_owned()89 } else {90 v.to_owned()91 }92 })93 .join("\n")94 );95 }96 Ok(())97 }98 fn flush(self) {99 for line in self.collected {100 warn!("{line}");101 }102 }103}104impl<H: Handler> Handler for ErrorCollector<'_, H> {105 fn handle_line(&mut self, e: &str) {106 if self.handle_line_inner(e) {107 return;108 }109 self.inner.handle_line(e)110 }111}112113enum OutputLine {114 Out(String),115 Err(String),116}117struct OutputHandler {118 rx: mpsc::Receiver<OutputLine>,119 _cancel_handle: oneshot::Receiver<()>,120}121impl OutputHandler {122 fn new(out: ChildStdout, err: ChildStderr) -> Self {123 let mut out = FramedRead::new(out, LinesCodec::new());124 let mut err = FramedRead::new(err, LinesCodec::new());125 let (tx, rx) = mpsc::channel(20);126 let (mut cancelled, _cancel_handle) = oneshot::channel();127 tokio::spawn(async move {128 loop {129 select! {130 131 biased;132 e = err.next() => {133 let Some(Ok(e)) = e else {134 if e.is_some() {135 error!("bad repl stderr: {e:?}");136 }137 continue;138 };139 let _ = tx.send(OutputLine::Err(e)).await;140 }141 o = out.next() => {142 let Some(Ok(o)) = o else {143 if o.is_some() {144 error!("bad repl stdout: {o:?}");145 }146 continue;147 };148 let _ = tx.send(OutputLine::Out(o)).await;149 }150 151 152 _ = cancelled.closed() => {153 break;154 }155 }156 }157 });158 Self { rx, _cancel_handle }159 }160 async fn next(&mut self) -> Option<OutputLine> {161 self.rx.recv().await162 }163}164165struct WarnHandler;166impl Handler for WarnHandler {167 fn handle_line(&mut self, e: &str) {168 warn!(target: "nix", "{e}")169 }170}171172impl NixSessionInner {173 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {174 let mut cmd = Command::new("nix");175 cmd.arg("repl")176 .arg(flake)177 .arg("--log-format")178 .arg("internal-json");179 for arg in extra_args {180 cmd.arg(arg);181 }182 cmd.stdin(Stdio::piped());183 cmd.stdout(Stdio::piped());184 cmd.stderr(Stdio::piped());185 let cmd = cmd.spawn()?;186 let stdout = cmd.stdout.unwrap();187 let stderr = cmd.stderr.unwrap();188 let mut out = OutputHandler::new(stdout, stderr);189 let mut stdin = cmd.stdin.unwrap();190 191 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;192 stdin.write_all(b"\n").await?;193 stdin.flush().await?;194 let nix_handler = NixHandler::default();195 let mut full_delimiter = None;196 let mut errors = vec![];197 while let Some(line) = out.next().await {198 let line = match line {199 OutputLine::Out(o) => o,200 OutputLine::Err(_e) => {201 202 errors.push(_e);203 continue;204 }205 };206 if line.contains(REPL_DELIMITER) {207 debug!("discovered repl delimiter with added colors: {line}");208 full_delimiter = Some(line.to_owned());209 break;210 }211 }212 let Some(full_delimiter) = full_delimiter else {213 for e in errors {214 error!("{e}");215 }216 bail!("failed to discover delimiter");217 };218 let mut res = Self {219 full_delimiter,220 nix_handler: ClonableHandler::new(nix_handler),221 out,222 stdin,223 string_wrapping: Default::default(),224 number_wrapping: Default::default(),225226 executing_command: Arc::new(Mutex::new(())),227228 next_id: 0,229 free_list: vec![],230 };231 res.train().await?;232 Ok(res)233 }234 async fn train(&mut self) -> Result<()> {235 {236 let full_string = self237 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)238 .await?;239 let string_offset = full_string.find(TRAIN_STRING).expect("contained");240 let string_prefix = &full_string[..string_offset];241 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];242 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());243 }244 {245 let full_number = self246 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)247 .await?;248 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");249 let number_prefix = &full_number[..number_offset];250 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];251 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());252 }253 Ok(())254 }255 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {256 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {257 let cmd_str = String::from_utf8_lossy(cmd.as_ref());258 tracing::debug!("{cmd_str}");259 };260 self.stdin.write_all(cmd.as_ref()).await?;261 self.stdin.write_all(b"\n").await?;262 Ok(())263 }264 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {265 let mut out = String::new();266 while let Some(line) = self.out.next().await {267 let line = match line {268 OutputLine::Out(out) => out,269 OutputLine::Err(err) => {270 err_handler.handle_line(&err);271 continue;272 }273 };274 if line == self.full_delimiter {275 return Ok(out);276 }277 if !out.is_empty() {278 out.push('\n');279 }280 out.push_str(&line);281 }282 bail!("didn't reached delimiter");283 }284 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {285 let num = self.number_wrapping.clone();286 let n = self.execute_expression_wrapping(expr, &num).await?;287 Ok(n.parse::<u64>()?)288 }289 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {290 let num = self.string_wrapping.clone();291 let n = self.execute_expression_wrapping(expr, &num).await?;292 let str: String = serde_json::from_str(&n)?;293 Ok(str)294 }295 async fn execute_expression_to_json<V: DeserializeOwned>(296 &mut self,297 expr: impl AsRef<[u8]>,298 ) -> Result<V> {299 let mut fexpr = b"builtins.toJSON (".to_vec();300 fexpr.extend_from_slice(expr.as_ref());301 fexpr.push(b')');302 let v = self303 .execute_expression_string(fexpr)304 .await305 .context("string expression")?;306 serde_json::from_str(&v).context("json parse")307 }308 async fn execute_expression_wrapping(309 &mut self,310 expr: impl AsRef<[u8]>,311 wrapping: &(String, String),312 ) -> Result<String> {313 let mut nix_handler = self.nix_handler.clone();314 let mut collected = ErrorCollector::new(&mut nix_handler);315 let res = self.execute_expression_raw(expr, &mut collected).await?;316 if res.is_empty() {317 collected.finish()?;318 bail!("expected expression, got nothing")319 } else {320 collected.flush()321 };322 let Some(res) = res.strip_prefix(&wrapping.0) else {323 bail!("invalid type")324 };325 let Some(res) = res.strip_suffix(&wrapping.1) else {326 bail!("invalid type")327 };328 Ok(res.to_owned())329 }330 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {331 let mut nix_handler = self.nix_handler.clone();332 let mut collected = ErrorCollector::new(&mut nix_handler);333 let v = self.execute_expression_raw(expr, &mut collected).await?;334 collected.finish()?;335 ensure!(v.is_empty(), "unexpected expression result");336 Ok(())337 }338 async fn execute_expression_raw(339 &mut self,340 expr: impl AsRef<[u8]>,341 err_handler: &mut dyn Handler,342 ) -> Result<String> {343 344 let _lock = self.executing_command.clone();345 let _guard = _lock.lock().await;346347 self.send_command(expr).await?;348 349 self.send_command(REPL_DELIMITER).await?;350 self.read_until_delimiter(err_handler).await351 }352 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {353 let id = self.allocate_id();354 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))355 .await?;356 Ok(id)357 }358359 360 fn allocate_id(&mut self) -> u32 {361 if let Some(free) = self.free_list.pop() {362 free363 } else {364 let v = self.next_id;365 self.next_id += 1;366 v367 }368 }369 370 371 372 373 374 375 376}377378#[derive(Clone)]379pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);380381#[derive(Clone)]382pub struct NixExprBuilder {383 out: String,384 used_fields: Vec<Field>,385}386impl NixExprBuilder {387 pub fn object() -> Self {388 NixExprBuilder {389 out: "{ ".to_owned(),390 used_fields: Vec::new(),391 }392 }393 pub fn string(s: &str) -> Self {394 NixExprBuilder {395 out: nixlike::serialize(s)396 .expect("no problems with serializing_string")397 .trim_end()398 .to_owned(),399 used_fields: Vec::new(),400 }401 }402 pub fn serialized(v: impl Serialize) -> Self {403 let serialized = nixlike::serialize(v).expect("invalid value for apply");404 Self {405 out: serialized.trim_end().to_owned(),406 used_fields: Vec::new(),407 }408 }409 pub fn field(f: Field) -> Self {410 Self {411 out: format!("sess_field_{}", f.0.value.expect("no value")),412 used_fields: vec![f],413 }414 }415 pub fn end_obj(&mut self) {416 self.out.push('}');417 }418 pub fn obj_key(&mut self, name: Self, value: Self) {419 self.out.push_str(r#""${"#);420 self.extend(name);421 self.out.push_str(r#"}" = "#);422 self.extend(value);423 self.out.push_str("; ");424 }425426 pub fn extend(&mut self, e: Self) {427 self.out.push_str(&e.out);428 self.used_fields.extend(e.used_fields);429 }430431 pub fn session(&self) -> NixSession {432 let mut session = None;433 for ele in &self.used_fields {434 if session.is_none() {435 session = Some(ele.0.session.clone());436 continue;437 }438 let session = &session.as_ref().expect("checked").0;439 let ele_sess = &ele.0.session.0;440 assert!(441 Arc::ptr_eq(session, ele_sess),442 "can't mix fields from different session"443 );444 }445 session.expect("expr without fields used")446 }447 pub fn index_attr(&mut self, s: &str) {448 let escaped = nixlike::serialize(s).expect("string");449 self.out.push('.');450 self.out.push_str(escaped.trim_end());451 }452}453454#[macro_export]455macro_rules! nix_expr_inner {456 457 (@obj($o:ident) $field:ident, $($tt:tt)*) => {{458 $o.obj_key(459 NixExprBuilder::string(stringify!($field)),460 NixExprBuilder::field($field),461 );462 nix_expr_inner!(@obj($o) $($tt)*);463 }};464 (@obj($o:ident) $field:ident: $v:block, $($tt:tt)*) => {{465 $o.obj_key(466 NixExprBuilder::string(stringify!($field)),467 NixExprBuilder::serialized(&$v),468 );469 nix_expr_inner!(@obj($o) $($tt)*);470 }};471 (@obj($o:ident)) => {{}};472 (Obj { $($tt:tt)* }) => {{473 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};474 let mut out = NixExprBuilder::object();475 nix_expr_inner!(@obj(out) $($tt)*);476 out.end_obj();477 out478 }};479 (@field($o:ident) . $var:ident $($tt:tt)*) => {{480 $o.index_attr(stringify!($var));481 nix_expr_inner!(@field($o) $($tt)*);482 }};483 (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{484 $o.push(Index::attr(&$v));485 nix_expr_inner!(@o($o) $($tt)*);486 }};487 (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{488 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));489 nix_expr_inner!(@o($o) $($tt)*);490 }};491 (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {492 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));493 nix_expr_inner!(@o($o) $($tt)*);494 };495 (@field($o:ident)) => {};496 ($field:ident $($tt:tt)*) => {{497 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};498 #[allow(unused_mut, reason = "might be used if indexed")]499 let mut out = NixExprBuilder::field($field.clone());500 nix_expr_inner!(@field(out) $($tt)*);501 out502 }};503 ($v:literal) => {{504 use $crate::better_nix_eval::NixExprBuilder;505 NixExprBuilder::string($v)506 }};507 ({$v:expr}) => {{508 use $crate::better_nix_eval::NixExprBuilder;509 NixExprBuilder::serialized(&$v)510 }}511}512#[macro_export]513macro_rules! nix_expr {514 ($($tt:tt)+) => {{515 use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};516 let expr = nix_expr_inner!($($tt)+);517 Field::new(expr.session(), expr.out)518 }};519}520521#[macro_export]522macro_rules! nix_go {523 (@o($o:ident) . $var:ident $($tt:tt)*) => {{524 $o.push(Index::attr(stringify!($var)));525 nix_go!(@o($o) $($tt)*);526 }};527 (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{528 $o.push(Index::attr(&$v));529 nix_go!(@o($o) $($tt)*);530 }};531 (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{532 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));533 nix_go!(@o($o) $($tt)*);534 }};535 (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {536 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));537 nix_go!(@o($o) $($tt)*);538 };539 (@o($o:ident) | $($var:tt)*) => {540 $o.push(Index::Pipe($crate::nix_expr_inner!($($var)+)));541 };542 (@o($o:ident)) => {};543 ($field:ident $($tt:tt)+) => {{544 use $crate::{nix_go, better_nix_eval::Index};545 let field = $field.clone();546 let mut out = vec![];547 nix_go!(@o(out) $($tt)*);548 field.select(out).await?549 }}550}551#[macro_export]552macro_rules! nix_go_json {553 ($($tt:tt)*) => {{554 $crate::nix_go!($($tt)*).as_json().await?555 }};556}557558#[derive(Clone)]559pub enum Index {560 Var(String),561 String(String),562 Apply(String),563 Expr(NixExprBuilder),564 ExprApply(NixExprBuilder),565 Pipe(NixExprBuilder),566}567impl Index {568 pub fn var(v: impl AsRef<str>) -> Self {569 let v = v.as_ref();570 assert!(571 !(v.contains('.') | v.contains(' ')),572 "bad variable name: {v}"573 );574 Self::Var(v.to_owned())575 }576 pub fn attr(v: impl AsRef<str>) -> Self {577 Self::String(v.as_ref().to_owned())578 }579 pub fn apply(v: impl Serialize) -> Self {580 let serialized = nixlike::serialize(v).expect("invalid value for apply");581 Self::Apply(serialized.trim_end().to_owned())582 }583}584impl Display for Index {585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {586 match self {587 Index::Var(v) => {588 write!(f, "{v}")589 }590 Index::String(k) => {591 let v = nixlike::format_identifier(k.as_str());592 write!(f, ".{v}")593 }594 Index::Apply(o) => {595 write!(f, "<apply>({o})")596 }597 Index::Expr(e) => {598 write!(f, "[{}]", e.out)599 }600 Index::ExprApply(e) => {601 write!(f, "<apply>({})", e.out)602 }603 Index::Pipe(e) => {604 write!(f, "<map>({})", e.out)605 }606 }607 }608}609impl fmt::Debug for Index {610 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {611 write!(f, "{self}")612 }613}614struct PathDisplay<'i>(&'i [Index]);615impl Display for PathDisplay<'_> {616 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {617 for i in self.0 {618 write!(f, "{i}")?;619 }620 Ok(())621 }622}623struct FieldInner {624 full_path: Option<Vec<Index>>,625 session: NixSession,626 value: Option<u32>,627}628fn context(op: &str, full_path: Option<&[Index]>, query: &str) -> String {629 if let Some(full_path) = &full_path {630 format!("on {op}, full path: {}", PathDisplay(full_path))631 } else {632 format!("query: {query:?}")633 }634}635#[derive(Clone)]636pub struct Field(Arc<FieldInner>);637impl Field {638 fn root(session: NixSession) -> Self {639 Self(Arc::new(FieldInner {640 full_path: Some(vec![]),641 session,642 value: None,643 }))644 }645 async fn new(session: NixSession, query: &str) -> Result<Self> {646 let vid = session647 .0648 .lock()649 .await650 .execute_assign(query)651 .await652 .with_context(|| context("new root", None, query))?;653 Ok(Self(Arc::new(FieldInner {654 full_path: None,655 session,656 value: Some(vid),657 })))658 }659 pub async fn field(session: NixSession, field: &str) -> Result<Self> {660 Self::root(session).select([Index::var(field)]).await661 }662 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {663 let mut used_fields = Vec::new();664 let mut name = name.into_iter();665666 let mut full_path = self.0.full_path.clone();667 let mut query = if let Some(id) = self.0.value {668 format!("sess_field_{id}")669 } else {670 let first = name.next();671 if let Some(Index::Var(i)) = first {672 if let Some(full_path) = &mut full_path {673 full_path.push(Index::Var(i.clone()));674 }675 i.clone()676 } else {677 panic!("first path item should be variable, got {first:?}")678 }679 };680 for v in name {681 if let Some(full_path) = &mut full_path {682 full_path.push(v.clone());683 }684 match v {685 Index::Var(_) => panic!("var item may only be first"),686 Index::String(s) => {687 let escaped = nixlike::serialize(s)?;688 query.push('.');689 query.push_str(escaped.trim());690 }691 Index::Apply(a) => {692 693 query = format!("({query} {a})");694 }695 Index::Expr(e) => {696 let index = Field::new(self.0.session.clone(), &e.out).await?;697 used_fields.push(index.clone());698 query.push('.');699 let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));700 query.push_str(&index);701 }702 Index::ExprApply(e) => {703 let index = Field::new(self.0.session.clone(), &e.out).await?;704 used_fields.push(index.clone());705 query.push(' ');706 let index = format!("sess_field_{}", index.0.value.expect("value"));707 query.push_str(&index);708 query = format!("({query})");709 }710 Index::Pipe(v) => {711 let index = Field::new(self.0.session.clone(), &v.out).await?;712 used_fields.push(index.clone());713 let index = format!("sess_field_{}", index.0.value.expect("value"));714 query = format!("({index} {query})");715 }716 }717 }718719 let vid = self720 .0721 .session722 .0723 .lock()724 .await725 .execute_assign(&query)726 .await727 .with_context(|| {728 if let Some(full_path) = &full_path {729 format!("full path: {}", PathDisplay(full_path))730 } else {731 format!("query: {query:?}")732 }733 })?;734 Ok(Self(Arc::new(FieldInner {735 full_path,736 session: self.0.session.clone(),737 value: Some(vid),738 })))739 }740 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {741 let id = self.0.value.expect("can't serialize root field");742 let query = format!("sess_field_{id}");743 self.0744 .session745 .0746 .lock()747 .await748 .execute_expression_to_json(&query)749 .await750 .with_context(|| context("as_json", self.0.full_path.as_deref(), &query))751 }752 pub async fn has_field(&self, name: &str) -> Result<bool> {753 let id = self.0.value.expect("can't list root fields");754 let key = nixlike::escape_string(name);755 let query = format!("sess_field_{id} ? {key}");756 self.0757 .session758 .0759 .lock()760 .await761 .execute_expression_to_json(&query)762 .await763 .with_context(|| context("has_field", self.0.full_path.as_deref(), &query))764 }765 pub async fn list_fields(&self) -> Result<Vec<String>> {766 let id = self.0.value.expect("can't list root fields");767 let query = format!("builtins.attrNames sess_field_{id}");768 self.0769 .session770 .0771 .lock()772 .await773 .execute_expression_to_json(&query)774 .await775 .with_context(|| context("list field", self.0.full_path.as_deref(), &query))776 }777 pub async fn type_of(&self) -> Result<String> {778 let id = self.0.value.expect("can't list root fields");779 let query = format!("builtins.typeOf sess_field_{id}");780 self.0781 .session782 .0783 .lock()784 .await785 .execute_expression_to_json(&query)786 .await787 .with_context(|| context("type_of", self.0.full_path.as_deref(), &query))788 }789 pub async fn import(&self) -> Result<Self> {790 let import = Self::new(self.0.session.clone(), "import").await?;791 Ok(nix_go!(self | import))792 }793 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {794 let id = self.0.value.expect("can't use build on not-value");795 let query = format!(":b sess_field_{id}");796 let vid = self797 .0798 .session799 .0800 .lock()801 .await802 .execute_expression_raw(&query, &mut NixHandler::default())803 .await?;804 ensure!(805 !vid.is_empty(),806 "build failed: {}",807 context("build", self.0.full_path.as_deref(), &query),808 );809 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")810 else {811 panic!("unexpected build output: {vid:?}");812 };813 let outputs = vid814 .split('\n')815 .filter(|v| !v.is_empty())816 .map(|v| v.split_once(" -> ").expect("unexpected build output"))817 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))818 .collect();819 Ok(outputs)820 }821}822impl Drop for FieldInner {823 fn drop(&mut self) {824 if let Some(id) = self.value {825 if let Ok(mut lock) = self.session.0.try_lock() {826 lock.free_list.push(id)827 }828 829 }830 }831}832struct NixSessionPoolInner {833 flake: OsString,834 nix_args: Vec<OsString>,835}836837#[derive(Debug)]838pub struct NixPoolError(anyhow::Error);839impl From<anyhow::Error> for NixPoolError {840 fn from(value: anyhow::Error) -> Self {841 Self(value)842 }843}844impl std::error::Error for NixPoolError {}845impl std::fmt::Display for NixPoolError {846 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {847 self.0.fmt(f)848 }849}850impl r2d2::ManageConnection for NixSessionPoolInner {851 type Connection = NixSessionInner;852 type Error = NixPoolError;853 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {854 let _v = TOKIO_RUNTIME855 .get()856 .expect("missed tokio runtime init!")857 .enter();858 Ok(futures::executor::block_on(NixSessionInner::new(859 self.flake.as_os_str(),860 self.nix_args.iter().map(OsString::as_os_str),861 ))?)862 }863864 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {865 let _v = TOKIO_RUNTIME866 .get()867 .expect("missed tokio runtime init!")868 .enter();869 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;870 if res != 4 {871 return Err(anyhow!("sanity check failed").into());872 };873 Ok(())874 }875876 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {877 false878 }879}880pub struct NixSessionPool(Pool<NixSessionPoolInner>);881impl NixSessionPool {882 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {883 let inner = tokio::task::block_in_place(|| {884 r2d2::Builder::<NixSessionPoolInner>::new()885 .min_idle(Some(0))886 .build(NixSessionPoolInner { flake, nix_args })887 })?;888 Ok(Self(inner))889 }890 pub async fn get(&self) -> Result<NixSession> {891 let v = tokio::task::block_in_place(|| self.0.get())?;892 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))893 }894}895896pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();