123456use std::{collections::HashMap, path::PathBuf, sync::Arc};78pub use pool::NixSessionPool;9use pool::NixSessionPoolInner;10use r2d2::PooledConnection;11pub use session::{Error, Result};12use tokio::sync::{mpsc, oneshot};13use tracing::instrument;14pub use value::{Index, Value};1516mod pool;17mod session;18mod value;1920#[doc(hidden)]21pub mod macros;22pub mod util;23242526272829303132#[derive(Clone)]33pub struct NixSession(pub(crate) Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);3435struct NixBuildTask(Value, oneshot::Sender<Result<HashMap<String, PathBuf>>>);3637#[derive(Clone)]38pub struct NixBuildBatch {39 tx: mpsc::UnboundedSender<NixBuildTask>,40}4142#[instrument(skip(session, values))]43async fn build_multiple(name: String, session: NixSession, values: Vec<Value>) -> Result<()> {44 let builtins = Value::binding(session, "builtins").await?;45 let system = nix_go!(builtins.currentSystem);46 let drv = nix_go!(builtins.derivation(Obj {47 system,48 name,49 builder: "/bin/sh",50 51 args: vec!["-c", "echo > $out"],52 preferLocalBuild: true,53 allowSubstitutes: false,54 buildInputs: values,55 }));56 drv.build().await?;57 Ok(())58}5960impl NixBuildBatch {61 fn new(name: String, session: NixSession) -> Self {62 let (tx, mut rx) = mpsc::unbounded_channel::<NixBuildTask>();6364 tokio::task::spawn(async move {65 let mut deps = vec![];66 let mut build_data = vec![];67 while let Some(task) = rx.recv().await {68 build_data.push(task.0.clone());69 deps.push(task);70 }71 if deps.is_empty() {72 return;73 }74 match build_multiple(name, session, build_data).await {75 Ok(_) => {76 for NixBuildTask(v, o) in deps {77 let _ = o.send(v.build().await);78 }79 }80 Err(e) => {81 for NixBuildTask(v, o) in deps {82 let s = v.to_string_weak().await.expect("drv is string-like");83 if PathBuf::from(s).exists() {84 let _ = o.send(v.build().await);85 } else {86 let _ = o.send(Err(e.clone()));87 }88 }89 }90 };91 });92 Self { tx }93 }94 pub async fn submit(self, task: Value) -> Result<HashMap<String, PathBuf>> {95 let Self { tx: task_tx } = self;96 let (tx, rx) = oneshot::channel();97 let _ = task_tx.send(NixBuildTask(task, tx));98 drop(task_tx);99 rx.await.expect("shoudn't be cancelled here")100 }101}102103impl NixSession {104 fn ptr_eq(a: &Self, b: &Self) -> bool {105 Arc::ptr_eq(&a.0, &b.0)106 }107108 pub fn new_build_batch(&self, name: String) -> NixBuildBatch {109 NixBuildBatch::new(name, self.clone())110 }111}112113pub fn init_tokio() {114 let _ = pool::TOKIO_RUNTIME.set(tokio::runtime::Handle::current());115}