difftreelog
feat do not use build batching for single-host jobs
in: trunk
4 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1820,6 +1820,7 @@
name = "nix-native-eval"
version = "0.1.0"
dependencies = [
+ "anyhow",
"nixrs",
]
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -272,31 +272,23 @@
impl BuildSystems {
pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
- let hosts = config.list_hosts().await?;
+ let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
let set = LocalSet::new();
let build_attr = self.build_attr.clone();
- for host in hosts.into_iter() {
- if opts.should_skip(&host).await? {
- continue;
- }
+ let batch = (hosts.len() > 1).then(|| {
+ config
+ .nix_session
+ .new_build_batch("build-hosts".to_string())
+ });
+ for host in hosts {
let config = config.clone();
let span = info_span!("build", host = field::display(&host.name));
let hostname = host.name;
let build_attr = build_attr.clone();
- // FIXME: Since the introduction of better-nix-eval,
- // due to single repl used for builds, hosts are waiting for each other to build,
- // instead of building concurrently.
- //
- // Open multiple repls?
- //
- // Create build batcher, which will behave similar to golangs
- // WaitGroup, and start executing once all the build tasks are scheduled?
- // This also allows to cleanup build output, as there will be no longer
- // "waiting for remote machine" messages in the cases when one package is needed for
- // multiple hosts.
+ let batch = batch.clone();
set.spawn_local(
(async move {
- let built = match build_task(config, hostname.clone(), &build_attr, None).await
+ let built = match build_task(config, hostname.clone(), &build_attr, batch).await
{
Ok(path) => path,
Err(e) => {
@@ -316,6 +308,7 @@
.instrument(span),
);
}
+ drop(batch);
set.await;
Ok(())
}
@@ -323,20 +316,21 @@
impl Deploy {
pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
- let hosts = config.list_hosts().await?;
+ let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
let set = LocalSet::new();
- let batch = Some(config.nix_session.new_build_batch(format!("deploy-hosts")));
+ let batch = (hosts.len() > 1).then(|| {
+ config
+ .nix_session
+ .new_build_batch("deploy-hosts".to_string())
+ });
for host in hosts.into_iter() {
- if opts.should_skip(&host).await? {
- continue;
- }
let config = config.clone();
let span = info_span!("deploy", host = field::display(&host.name));
let hostname = host.name.clone();
let local_host = config.local_host();
let opts = opts.clone();
let batch = batch.clone();
- // FIXME: Fix repl concurrency (see build-systems)
+
set.spawn_local(
(async move {
let built =
crates/fleet-base/src/opts.rsdiffbeforeafterboth--- a/crates/fleet-base/src/opts.rs
+++ b/crates/fleet-base/src/opts.rs
@@ -91,6 +91,16 @@
}
impl FleetOpts {
+ pub async fn filter_skipped(&self, hosts: impl IntoIterator<Item = ConfigHost>) -> Result<Vec<ConfigHost>> {
+ let mut out = Vec::new();
+ for host in hosts {
+ if self.should_skip(&host).await? {
+ continue;
+ }
+ out.push(host);
+ }
+ Ok(out)
+ }
pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {
if self.skip.iter().any(|h| h as &str == host.name) {
return Ok(true);
crates/nix-eval/src/lib.rsdiffbeforeafterboth1//! This whole library should be replaced with either binding to nix libexpr,2//! or with tvix (once it is able to build NixOS).3//!4//! Current api is awful, little effort was put into this implementation.56use std::{collections::HashMap, path::PathBuf, sync::Arc};78pub use pool::NixSessionPool;9use pool::NixSessionPoolInner;10use r2d2::PooledConnection;11pub use session::{Error, Result};12use tokio::{13 sync::{mpsc, oneshot},14 task::AbortHandle,15};16use tracing::{info, instrument, Instrument};17pub use value::{Index, Value};1819mod pool;20mod session;21mod value;22// Contains macros helpers23#[doc(hidden)]24pub mod macros;25pub mod util;26// #[allow(non_upper_case_globals, non_camel_case_types, non_snake_case)]27// mod nix_raw {28// include!(concat!(env!("OUT_DIR"), "/bindings.rs"));29// }3031// fn init() {32// nix_raw::libutil_init();33// }3435#[derive(Clone)]36pub struct NixSession(pub(crate) Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);3738struct NixBuildTask(Value, oneshot::Sender<Result<HashMap<String, PathBuf>>>);3940#[derive(Clone)]41pub struct NixBuildBatch {42 tx: mpsc::UnboundedSender<NixBuildTask>,43}4445#[instrument(skip(session, values))]46async fn build_multiple(name: String, session: NixSession, values: Vec<Value>) -> Result<()> {47 let builtins = Value::binding(session, "builtins").await?;48 let system = nix_go!(builtins.currentSystem);49 let drv = nix_go!(builtins.derivation(Obj {50 system,51 name,52 builder: "/bin/sh",53 // we want nothing from this derivation, it is only used to perform multiple builds at once.54 args: vec!["-c", "echo > $out"],55 preferLocalBuild: true,56 allowSubstitutes: false,57 buildInputs: values,58 }));59 drv.build().await?;60 Ok(())61}6263impl NixBuildBatch {64 fn new(name: String, session: NixSession) -> Self {65 let (tx, mut rx) = mpsc::unbounded_channel::<NixBuildTask>();6667 tokio::task::spawn(async move {68 let mut deps = vec![];69 let mut build_data = vec![];70 while let Some(task) = rx.recv().await {71 build_data.push(task.0.clone());72 deps.push(task);73 }74 if deps.is_empty() {75 return;76 }77 match build_multiple(name, session, build_data).await {78 Ok(_) => {79 for NixBuildTask(v, o) in deps {80 let _ = o.send(v.build().await);81 }82 }83 Err(e) => {84 for NixBuildTask(v, o) in deps {85 let s = v.to_string_weak().await.expect("drv is string-like");86 if PathBuf::from(s).exists() {87 let _ = o.send(v.build().await);88 } else {89 let _ = o.send(Err(e.clone()));90 }91 }92 }93 };94 });95 Self { tx }96 }97 pub async fn submit(self, task: Value) -> Result<HashMap<String, PathBuf>> {98 let Self { tx: task_tx } = self;99 let (tx, rx) = oneshot::channel();100 let _ = task_tx.send(NixBuildTask(task, tx));101 drop(task_tx);102 rx.await.expect("shoudn't be cancelled here")103 }104}105106impl NixSession {107 fn ptr_eq(a: &Self, b: &Self) -> bool {108 Arc::ptr_eq(&a.0, &b.0)109 }110111 pub fn new_build_batch(&self, name: String) -> NixBuildBatch {112 NixBuildBatch::new(name, self.clone())113 }114}115116pub fn init_tokio() {117 let _ = pool::TOKIO_RUNTIME.set(tokio::runtime::Handle::current());118}