1234use std::collections::HashMap;5use std::ffi::{OsStr, OsString};6use std::fmt::{self, Display};7use std::path::PathBuf;8use std::process::Stdio;9use std::sync::{Arc, OnceLock};1011use anyhow::{anyhow, bail, ensure, Context, Result};12use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};13use futures::StreamExt;14use itertools::Itertools;15use r2d2::{Pool, PooledConnection};16use serde::de::DeserializeOwned;17use serde::{Deserialize, Serialize};18use tokio::io::AsyncWriteExt;19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};20use tokio::select;21use tokio::sync::{mpsc, oneshot, Mutex};22use tokio_util::codec::{FramedRead, LinesCodec};23use tracing::{debug, error, warn, Level};2425const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2627pub struct NixSessionInner {28 full_delimiter: String,29 nix_handler: ClonableHandler<NixHandler>,30 out: OutputHandler,31 stdin: ChildStdin,32 string_wrapping: (String, String),33 number_wrapping: (String, String),3435 executing_command: Arc<Mutex<()>>,3637 next_id: u32,38 free_list: Vec<u32>,39}40const TRAIN_STRING: &str = "\"TRAIN_STRING\"";41const TRAIN_NUMBER: &str = "13141516";4243#[must_use]44struct ErrorCollector<'i, H> {45 collected: Vec<String>,46 inner: &'i mut H,47}48impl<'i, H> ErrorCollector<'i, H> {49 fn new(inner: &'i mut H) -> Self {50 Self {51 collected: vec![],52 inner,53 }54 }55}56impl<H> ErrorCollector<'_, H> {57 fn handle_line_inner(&mut self, msg: &str) -> bool {58 let Some(msg) = msg.strip_prefix("@nix ") else {59 return false;60 };61 #[derive(Deserialize)]62 struct ErrorAction {63 action: String,64 level: u32,65 msg: String,66 }67 let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {68 return false;69 };70 if act.action != "msg" || act.level != 0 {71 return false;72 }73 self.collected.push(act.msg);74 true75 }76 fn finish(self) -> Result<()> {77 78 79 80 if !self.collected.is_empty() {81 bail!(82 "{}",83 self.collected84 .iter()85 .map(|v| {86 if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {87 let v = unindent::unindent(f.trim_start());88 v.trim().to_owned()89 } else {90 v.to_owned()91 }92 })93 .join("\n")94 );95 }96 Ok(())97 }98 fn flush(self) {99 for line in self.collected {100 warn!("{line}");101 }102 }103}104impl<H: Handler> Handler for ErrorCollector<'_, H> {105 fn handle_line(&mut self, e: &str) {106 if self.handle_line_inner(e) {107 return;108 }109 self.inner.handle_line(e)110 }111}112113enum OutputLine {114 Out(String),115 Err(String),116}117struct OutputHandler {118 rx: mpsc::Receiver<OutputLine>,119 _cancel_handle: oneshot::Receiver<()>,120}121impl OutputHandler {122 fn new(out: ChildStdout, err: ChildStderr) -> Self {123 let mut out = FramedRead::new(out, LinesCodec::new());124 let mut err = FramedRead::new(err, LinesCodec::new());125 let (tx, rx) = mpsc::channel(20);126 let (mut cancelled, _cancel_handle) = oneshot::channel();127 tokio::spawn(async move {128 loop {129 select! {130 131 biased;132 e = err.next() => {133 let Some(Ok(e)) = e else {134 if e.is_some() {135 error!("bad repl stderr: {e:?}");136 }137 continue;138 };139 let _ = tx.send(OutputLine::Err(e)).await;140 }141 o = out.next() => {142 let Some(Ok(o)) = o else {143 if o.is_some() {144 error!("bad repl stdout: {o:?}");145 }146 continue;147 };148 let _ = tx.send(OutputLine::Out(o)).await;149 }150 151 152 _ = cancelled.closed() => {153 break;154 }155 }156 }157 });158 Self { rx, _cancel_handle }159 }160 async fn next(&mut self) -> Option<OutputLine> {161 self.rx.recv().await162 }163}164165struct WarnHandler;166impl Handler for WarnHandler {167 fn handle_line(&mut self, e: &str) {168 warn!(target: "nix", "{e}")169 }170}171172impl NixSessionInner {173 async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {174 let mut cmd = Command::new("nix");175 cmd.arg("repl")176 .arg(flake)177 .arg("--log-format")178 .arg("internal-json");179 for arg in extra_args {180 cmd.arg(arg);181 }182 cmd.stdin(Stdio::piped());183 cmd.stdout(Stdio::piped());184 cmd.stderr(Stdio::piped());185 let cmd = cmd.spawn()?;186 let stdout = cmd.stdout.unwrap();187 let stderr = cmd.stderr.unwrap();188 let mut out = OutputHandler::new(stdout, stderr);189 let mut stdin = cmd.stdin.unwrap();190 191 stdin.write_all(REPL_DELIMITER.as_bytes()).await?;192 stdin.write_all(b"\n").await?;193 stdin.flush().await?;194 let nix_handler = NixHandler::default();195 let mut full_delimiter = None;196 let mut errors = vec![];197 while let Some(line) = out.next().await {198 let line = match line {199 OutputLine::Out(o) => o,200 OutputLine::Err(_e) => {201 202 errors.push(_e);203 continue;204 }205 };206 if line.contains(REPL_DELIMITER) {207 debug!("discovered repl delimiter with added colors: {line}");208 full_delimiter = Some(line.to_owned());209 break;210 }211 }212 let Some(full_delimiter) = full_delimiter else {213 for e in errors {214 error!("{e}");215 }216 bail!("failed to discover delimiter");217 };218 let mut res = Self {219 full_delimiter,220 nix_handler: ClonableHandler::new(nix_handler),221 out,222 stdin,223 string_wrapping: Default::default(),224 number_wrapping: Default::default(),225226 executing_command: Arc::new(Mutex::new(())),227228 next_id: 0,229 free_list: vec![],230 };231 res.train().await?;232 Ok(res)233 }234 async fn train(&mut self) -> Result<()> {235 {236 let full_string = self237 .execute_expression_raw(TRAIN_STRING, &mut NoopHandler)238 .await?;239 let string_offset = full_string.find(TRAIN_STRING).expect("contained");240 let string_prefix = &full_string[..string_offset];241 let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];242 self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());243 }244 {245 let full_number = self246 .execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)247 .await?;248 let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");249 let number_prefix = &full_number[..number_offset];250 let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];251 self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());252 }253 Ok(())254 }255 async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {256 if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {257 let cmd_str = String::from_utf8_lossy(cmd.as_ref());258 tracing::debug!("{cmd_str}");259 };260 self.stdin.write_all(cmd.as_ref()).await?;261 self.stdin.write_all(b"\n").await?;262 Ok(())263 }264 async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {265 let mut out = String::new();266 while let Some(line) = self.out.next().await {267 let line = match line {268 OutputLine::Out(out) => out,269 OutputLine::Err(err) => {270 err_handler.handle_line(&err);271 continue;272 }273 };274 if line == self.full_delimiter {275 return Ok(out);276 }277 if !out.is_empty() {278 out.push('\n');279 }280 out.push_str(&line);281 }282 bail!("didn't reached delimiter");283 }284 async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {285 let num = self.number_wrapping.clone();286 let n = self.execute_expression_wrapping(expr, &num).await?;287 Ok(n.parse::<u64>()?)288 }289 async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {290 let num = self.string_wrapping.clone();291 let n = self.execute_expression_wrapping(expr, &num).await?;292 let str: String = serde_json::from_str(&n)?;293 Ok(str)294 }295 async fn execute_expression_to_json<V: DeserializeOwned>(296 &mut self,297 expr: impl AsRef<[u8]>,298 ) -> Result<V> {299 let mut fexpr = b"builtins.toJSON (".to_vec();300 fexpr.extend_from_slice(expr.as_ref());301 fexpr.push(b')');302 let v = self.execute_expression_string(fexpr).await?;303 Ok(serde_json::from_str(&v)?)304 }305 async fn execute_expression_wrapping(306 &mut self,307 expr: impl AsRef<[u8]>,308 wrapping: &(String, String),309 ) -> Result<String> {310 let mut nix_handler = self.nix_handler.clone();311 let mut collected = ErrorCollector::new(&mut nix_handler);312 let res = self.execute_expression_raw(expr, &mut collected).await?;313 if res.is_empty() {314 collected.finish()?;315 bail!("expected expression, got nothing")316 } else {317 collected.flush()318 };319 let Some(res) = res.strip_prefix(&wrapping.0) else {320 bail!("invalid type")321 };322 let Some(res) = res.strip_suffix(&wrapping.1) else {323 bail!("invalid type")324 };325 Ok(res.to_owned())326 }327 async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {328 let mut nix_handler = self.nix_handler.clone();329 let mut collected = ErrorCollector::new(&mut nix_handler);330 let v = self.execute_expression_raw(expr, &mut collected).await?;331 collected.finish()?;332 ensure!(v.is_empty(), "unexpected expression result");333 Ok(())334 }335 async fn execute_expression_raw(336 &mut self,337 expr: impl AsRef<[u8]>,338 err_handler: &mut dyn Handler,339 ) -> Result<String> {340 341 let _lock = self.executing_command.clone();342 let _guard = _lock.lock().await;343344 self.send_command(expr).await?;345 346 self.send_command(REPL_DELIMITER).await?;347 self.read_until_delimiter(err_handler).await348 }349 async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {350 let id = self.allocate_id();351 self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))352 .await?;353 Ok(id)354 }355356 357 fn allocate_id(&mut self) -> u32 {358 if let Some(free) = self.free_list.pop() {359 free360 } else {361 let v = self.next_id;362 self.next_id += 1;363 v364 }365 }366 367 368 369 370 371 372 373}374375#[derive(Clone)]376pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);377378#[derive(Clone)]379pub struct NixExprBuilder {380 out: String,381 used_fields: Vec<Field>,382}383impl NixExprBuilder {384 pub fn object() -> Self {385 NixExprBuilder {386 out: "{ ".to_owned(),387 used_fields: Vec::new(),388 }389 }390 pub fn string(s: &str) -> Self {391 NixExprBuilder {392 out: nixlike::serialize(s)393 .expect("no problems with serializing_string")394 .trim_end()395 .to_owned(),396 used_fields: Vec::new(),397 }398 }399 pub fn serialized(v: impl Serialize) -> Self {400 let serialized = nixlike::serialize(v).expect("invalid value for apply");401 Self {402 out: serialized.trim_end().to_owned(),403 used_fields: Vec::new(),404 }405 }406 pub fn field(f: Field) -> Self {407 Self {408 out: format!("sess_field_{}", f.0.value.expect("no value")),409 used_fields: vec![f],410 }411 }412 pub fn end_obj(&mut self) {413 self.out.push('}');414 }415 pub fn obj_key(&mut self, name: Self, value: Self) {416 self.out.push_str(r#""${"#);417 self.extend(name);418 self.out.push_str(r#"}" = "#);419 self.extend(value);420 self.out.push_str("; ");421 }422423 pub fn extend(&mut self, e: Self) {424 self.out.push_str(&e.out);425 self.used_fields.extend(e.used_fields);426 }427428 pub fn session(&self) -> NixSession {429 let mut session = None;430 for ele in &self.used_fields {431 if session.is_none() {432 session = Some(ele.0.session.clone());433 continue;434 }435 let session = &session.as_ref().expect("checked").0;436 let ele_sess = &ele.0.session.0;437 assert!(438 Arc::ptr_eq(session, ele_sess),439 "can't mix fields from different session"440 );441 }442 session.expect("expr without fields used")443 }444 pub fn index_attr(&mut self, s: &str) {445 let escaped = nixlike::serialize(s).expect("string");446 self.out.push('.');447 self.out.push_str(escaped.trim_end());448 }449}450451#[macro_export]452macro_rules! nix_expr_inner {453 (Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{454 use $crate::better_nix_eval::NixExprBuilder;455 let mut out = NixExprBuilder::object();456 $(457 out.obj_key(458 NixExprBuilder::string(stringify!($ident)),459 $crate::nix_expr_inner!($($val)+),460 );461 )*462 out.end_obj();463 out464 }};465 (@field($o:ident) . $var:ident $($tt:tt)*) => {{466 $o.index_attr(stringify!($var));467 nix_expr_inner!(@field($o) $($tt)*);468 }};469 (@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{470 $o.push(Index::attr(&$v));471 nix_expr_inner!(@o($o) $($tt)*);472 }};473 (@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{474 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));475 nix_expr_inner!(@o($o) $($tt)*);476 }};477 (@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {478 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));479 nix_expr_inner!(@o($o) $($tt)*);480 };481 (@field($o:ident)) => {};482 ($field:ident $($tt:tt)*) => {{483 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};484 #[allow(unused_mut, reason = "might be used if indexed")]485 let mut out = NixExprBuilder::field($field.clone());486 nix_expr_inner!(@field(out) $($tt)*);487 out488 }};489 ($v:literal) => {{490 use $crate::better_nix_eval::NixExprBuilder;491 NixExprBuilder::string($v)492 }};493 ({$v:expr}) => {{494 use $crate::better_nix_eval::NixExprBuilder;495 NixExprBuilder::serialized(&$v)496 }}497}498#[macro_export]499macro_rules! nix_expr {500 ($($tt:tt)+) => {{501 use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};502 let expr = nix_expr_inner!($($tt)+);503 Field::new(expr.session(), expr.out)504 }};505}506507#[macro_export]508macro_rules! nix_go {509 (@o($o:ident) . $var:ident $($tt:tt)*) => {{510 $o.push(Index::attr(stringify!($var)));511 nix_go!(@o($o) $($tt)*);512 }};513 (@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{514 $o.push(Index::attr(&$v));515 nix_go!(@o($o) $($tt)*);516 }};517 (@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{518 $o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));519 nix_go!(@o($o) $($tt)*);520 }};521 (@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {522 $o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));523 nix_go!(@o($o) $($tt)*);524 };525 (@o($o:ident)) => {};526 ($field:ident $($tt:tt)+) => {{527 use $crate::{nix_go, better_nix_eval::Index};528 let field = $field.clone();529 let mut out = vec![];530 nix_go!(@o(out) $($tt)*);531 field.select(out).await?532 }}533}534#[macro_export]535macro_rules! nix_go_json {536 ($($tt:tt)*) => {{537 $crate::nix_go!($($tt)*).as_json().await?538 }};539}540541#[derive(Clone)]542pub enum Index {543 Var(String),544 String(String),545 Apply(String),546 Expr(NixExprBuilder),547 ExprApply(NixExprBuilder),548}549impl Index {550 pub fn var(v: impl AsRef<str>) -> Self {551 let v = v.as_ref();552 assert!(553 !(v.contains('.') | v.contains(' ')),554 "bad variable name: {v}"555 );556 Self::Var(v.to_owned())557 }558 pub fn attr(v: impl AsRef<str>) -> Self {559 Self::String(v.as_ref().to_owned())560 }561 pub fn apply(v: impl Serialize) -> Self {562 let serialized = nixlike::serialize(v).expect("invalid value for apply");563 Self::Apply(serialized.trim_end().to_owned())564 }565}566impl Display for Index {567 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {568 match self {569 Index::Var(v) => {570 write!(f, "{v}")571 }572 Index::String(k) => {573 let v = nixlike::format_identifier(k.as_str());574 write!(f, ".{v}")575 }576 Index::Apply(o) => {577 write!(f, "<apply>({o})")578 }579 Index::Expr(e) => {580 write!(f, "[{}]", e.out)581 }582 Index::ExprApply(e) => {583 write!(f, "<apply>({})", e.out)584 }585 }586 }587}588impl fmt::Debug for Index {589 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {590 write!(f, "{self}")591 }592}593struct PathDisplay<'i>(&'i [Index]);594impl Display for PathDisplay<'_> {595 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {596 for i in self.0 {597 write!(f, "{i}")?;598 }599 Ok(())600 }601}602struct FieldInner {603 full_path: Option<Vec<Index>>,604 session: NixSession,605 value: Option<u32>,606}607fn context(full_path: Option<&[Index]>, query: &str) -> String {608 if let Some(full_path) = &full_path {609 format!("full path: {}", PathDisplay(full_path))610 } else {611 format!("query: {query:?}")612 }613}614#[derive(Clone)]615pub struct Field(Arc<FieldInner>);616impl Field {617 fn root(session: NixSession) -> Self {618 Self(Arc::new(FieldInner {619 full_path: Some(vec![]),620 session,621 value: None,622 }))623 }624 async fn new(session: NixSession, query: &str) -> Result<Self> {625 let vid = session626 .0627 .lock()628 .await629 .execute_assign(query)630 .await631 .with_context(|| context(None, query))?;632 Ok(Self(Arc::new(FieldInner {633 full_path: None,634 session,635 value: Some(vid),636 })))637 }638 pub async fn field(session: NixSession, field: &str) -> Result<Self> {639 Self::root(session).select([Index::var(field)]).await640 }641 pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {642 let mut used_fields = Vec::new();643 let mut name = name.into_iter();644645 let mut full_path = self.0.full_path.clone();646 let mut query = if let Some(id) = self.0.value {647 format!("sess_field_{id}")648 } else {649 let first = name.next();650 if let Some(Index::Var(i)) = first {651 if let Some(full_path) = &mut full_path {652 full_path.push(Index::Var(i.clone()));653 }654 i.clone()655 } else {656 panic!("first path item should be variable, got {first:?}")657 }658 };659 for v in name {660 if let Some(full_path) = &mut full_path {661 full_path.push(v.clone());662 }663 match v {664 Index::Var(_) => panic!("var item may only be first"),665 Index::String(s) => {666 let escaped = nixlike::serialize(s)?;667 query.push('.');668 query.push_str(escaped.trim());669 }670 Index::Apply(a) => {671 672 query = format!("({query} {a})");673 }674 Index::Expr(e) => {675 let index = Field::new(self.0.session.clone(), &e.out).await?;676 used_fields.push(index.clone());677 query.push('.');678 let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));679 query.push_str(&index);680 }681 Index::ExprApply(e) => {682 let index = Field::new(self.0.session.clone(), &e.out).await?;683 used_fields.push(index.clone());684 query.push(' ');685 let index = format!("sess_field_{}", index.0.value.expect("value"));686 query.push_str(&index);687 query = format!("({query})");688 }689 }690 }691692 let vid = self693 .0694 .session695 .0696 .lock()697 .await698 .execute_assign(&query)699 .await700 .with_context(|| {701 if let Some(full_path) = &full_path {702 format!("full path: {}", PathDisplay(full_path))703 } else {704 format!("query: {query:?}")705 }706 })?;707 Ok(Self(Arc::new(FieldInner {708 full_path,709 session: self.0.session.clone(),710 value: Some(vid),711 })))712 }713 pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {714 let id = self.0.value.expect("can't serialize root field");715 let query = format!("sess_field_{id}");716 self.0717 .session718 .0719 .lock()720 .await721 .execute_expression_to_json(&query)722 .await723 .with_context(|| context(self.0.full_path.as_deref(), &query))724 }725 pub async fn has_field(&self, name: &str) -> Result<bool> {726 let id = self.0.value.expect("can't list root fields");727 let key = nixlike::escape_string(name);728 let query = format!("sess_field_{id} ? {key}");729 self.0730 .session731 .0732 .lock()733 .await734 .execute_expression_to_json(&query)735 .await736 .with_context(|| context(self.0.full_path.as_deref(), &query))737 }738 pub async fn list_fields(&self) -> Result<Vec<String>> {739 let id = self.0.value.expect("can't list root fields");740 let query = format!("builtins.attrNames sess_field_{id}");741 self.0742 .session743 .0744 .lock()745 .await746 .execute_expression_to_json(&query)747 .await748 .with_context(|| context(self.0.full_path.as_deref(), &query))749 }750 pub async fn type_of(&self) -> Result<String> {751 let id = self.0.value.expect("can't list root fields");752 let query = format!("builtins.typeOf sess_field_{id}");753 self.0754 .session755 .0756 .lock()757 .await758 .execute_expression_to_json(&query)759 .await760 .with_context(|| context(self.0.full_path.as_deref(), &query))761 }762 pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {763 let id = self.0.value.expect("can't use build on not-value");764 let query = format!(":b sess_field_{id}");765 let vid = self766 .0767 .session768 .0769 .lock()770 .await771 .execute_expression_raw(&query, &mut NixHandler::default())772 .await?;773 ensure!(774 !vid.is_empty(),775 "build failed: {}",776 context(self.0.full_path.as_deref(), &query),777 );778 let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")779 else {780 panic!("unexpected build output: {vid:?}");781 };782 let outputs = vid783 .split('\n')784 .filter(|v| !v.is_empty())785 .map(|v| v.split_once(" -> ").expect("unexpected build output"))786 .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))787 .collect();788 Ok(outputs)789 }790}791impl Drop for FieldInner {792 fn drop(&mut self) {793 if let Some(id) = self.value {794 if let Ok(mut lock) = self.session.0.try_lock() {795 lock.free_list.push(id)796 }797 798 }799 }800}801struct NixSessionPoolInner {802 flake: OsString,803 nix_args: Vec<OsString>,804}805806#[derive(Debug)]807pub struct NixPoolError(anyhow::Error);808impl From<anyhow::Error> for NixPoolError {809 fn from(value: anyhow::Error) -> Self {810 Self(value)811 }812}813impl std::error::Error for NixPoolError {}814impl std::fmt::Display for NixPoolError {815 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {816 self.0.fmt(f)817 }818}819impl r2d2::ManageConnection for NixSessionPoolInner {820 type Connection = NixSessionInner;821 type Error = NixPoolError;822 fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {823 let _v = TOKIO_RUNTIME824 .get()825 .expect("missed tokio runtime init!")826 .enter();827 Ok(futures::executor::block_on(NixSessionInner::new(828 self.flake.as_os_str(),829 self.nix_args.iter().map(OsString::as_os_str),830 ))?)831 }832833 fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {834 let _v = TOKIO_RUNTIME835 .get()836 .expect("missed tokio runtime init!")837 .enter();838 let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;839 if res != 4 {840 return Err(anyhow!("sanity check failed").into());841 };842 Ok(())843 }844845 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {846 false847 }848}849pub struct NixSessionPool(Pool<NixSessionPoolInner>);850impl NixSessionPool {851 pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {852 let inner = tokio::task::block_in_place(|| {853 r2d2::Builder::<NixSessionPoolInner>::new()854 .min_idle(Some(0))855 .build(NixSessionPoolInner { flake, nix_args })856 })?;857 Ok(Self(inner))858 }859 pub async fn get(&self) -> Result<NixSession> {860 let v = tokio::task::block_in_place(|| self.0.get())?;861 Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))862 }863}864865pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();