git.delta.rocks / jrsonnet / refs/commits / e7a5b5f940a6

difftreelog

feat do not use build batching for single-host jobs

Yaroslav Bolyukin2024-11-14parent: #45f5af0.patch.diff
in: trunk

4 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1820,6 +1820,7 @@
 name = "nix-native-eval"
 version = "0.1.0"
 dependencies = [
+ "anyhow",
  "nixrs",
 ]
 
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -272,31 +272,23 @@
 
 impl BuildSystems {
 	pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
-		let hosts = config.list_hosts().await?;
+		let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
 		let set = LocalSet::new();
 		let build_attr = self.build_attr.clone();
-		for host in hosts.into_iter() {
-			if opts.should_skip(&host).await? {
-				continue;
-			}
+		let batch = (hosts.len() > 1).then(|| {
+			config
+				.nix_session
+				.new_build_batch("build-hosts".to_string())
+		});
+		for host in hosts {
 			let config = config.clone();
 			let span = info_span!("build", host = field::display(&host.name));
 			let hostname = host.name;
 			let build_attr = build_attr.clone();
-			// FIXME: Since the introduction of better-nix-eval,
-			// due to single repl used for builds, hosts are waiting for each other to build,
-			// instead of building concurrently.
-			//
-			// Open multiple repls?
-			//
-			// Create build batcher, which will behave similar to golangs
-			// WaitGroup, and start executing once all the build tasks are scheduled?
-			// This also allows to cleanup build output, as there will be no longer
-			// "waiting for remote machine" messages in the cases when one package is needed for
-			// multiple hosts.
+			let batch = batch.clone();
 			set.spawn_local(
 				(async move {
-					let built = match build_task(config, hostname.clone(), &build_attr, None).await
+					let built = match build_task(config, hostname.clone(), &build_attr, batch).await
 					{
 						Ok(path) => path,
 						Err(e) => {
@@ -316,6 +308,7 @@
 				.instrument(span),
 			);
 		}
+		drop(batch);
 		set.await;
 		Ok(())
 	}
@@ -323,20 +316,21 @@
 
 impl Deploy {
 	pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
-		let hosts = config.list_hosts().await?;
+		let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
 		let set = LocalSet::new();
-		let batch = Some(config.nix_session.new_build_batch(format!("deploy-hosts")));
+		let batch = (hosts.len() > 1).then(|| {
+			config
+				.nix_session
+				.new_build_batch("deploy-hosts".to_string())
+		});
 		for host in hosts.into_iter() {
-			if opts.should_skip(&host).await? {
-				continue;
-			}
 			let config = config.clone();
 			let span = info_span!("deploy", host = field::display(&host.name));
 			let hostname = host.name.clone();
 			let local_host = config.local_host();
 			let opts = opts.clone();
 			let batch = batch.clone();
-			// FIXME: Fix repl concurrency (see build-systems)
+
 			set.spawn_local(
 				(async move {
 					let built =
modifiedcrates/fleet-base/src/opts.rsdiffbeforeafterboth
--- a/crates/fleet-base/src/opts.rs
+++ b/crates/fleet-base/src/opts.rs
@@ -91,6 +91,16 @@
 }
 
 impl FleetOpts {
+	pub async fn filter_skipped(&self, hosts: impl IntoIterator<Item = ConfigHost>) -> Result<Vec<ConfigHost>> {
+		let mut out = Vec::new();	
+		for host in hosts {
+			if self.should_skip(&host).await? {
+				continue;
+			}
+			out.push(host);
+		}
+		Ok(out)
+	}
 	pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {
 		if self.skip.iter().any(|h| h as &str == host.name) {
 			return Ok(true);
modifiedcrates/nix-eval/src/lib.rsdiffbeforeafterboth
before · crates/nix-eval/src/lib.rs
1//! This whole library should be replaced with either binding to nix libexpr,2//! or with tvix (once it is able to build NixOS).3//!4//! Current api is awful, little effort was put into this implementation.56use std::{collections::HashMap, path::PathBuf, sync::Arc};78pub use pool::NixSessionPool;9use pool::NixSessionPoolInner;10use r2d2::PooledConnection;11pub use session::{Error, Result};12use tokio::{13	sync::{mpsc, oneshot},14	task::AbortHandle,15};16use tracing::{info, instrument, Instrument};17pub use value::{Index, Value};1819mod pool;20mod session;21mod value;22// Contains macros helpers23#[doc(hidden)]24pub mod macros;25pub mod util;26// #[allow(non_upper_case_globals, non_camel_case_types, non_snake_case)]27// mod nix_raw {28// 	include!(concat!(env!("OUT_DIR"), "/bindings.rs"));29// }3031// fn init() {32// 	nix_raw::libutil_init();33// }3435#[derive(Clone)]36pub struct NixSession(pub(crate) Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);3738struct NixBuildTask(Value, oneshot::Sender<Result<HashMap<String, PathBuf>>>);3940#[derive(Clone)]41pub struct NixBuildBatch {42	tx: mpsc::UnboundedSender<NixBuildTask>,43}4445#[instrument(skip(session, values))]46async fn build_multiple(name: String, session: NixSession, values: Vec<Value>) -> Result<()> {47	let builtins = Value::binding(session, "builtins").await?;48	let system = nix_go!(builtins.currentSystem);49	let drv = nix_go!(builtins.derivation(Obj {50		system,51		name,52		builder: "/bin/sh",53		// we want nothing from this derivation, it is only used to perform multiple builds at once.54		args: vec!["-c", "echo > $out"],55		preferLocalBuild: true,56		allowSubstitutes: false,57		buildInputs: values,58	}));59	drv.build().await?;60	Ok(())61}6263impl NixBuildBatch {64	fn new(name: String, session: NixSession) -> Self {65		let (tx, mut rx) = mpsc::unbounded_channel::<NixBuildTask>();6667		tokio::task::spawn(async move {68			let mut deps = vec![];69			let mut build_data = vec![];70			while let Some(task) = rx.recv().await {71				build_data.push(task.0.clone());72				deps.push(task);73			}74			if deps.is_empty() {75				return;76			}77			match build_multiple(name, session, build_data).await {78				Ok(_) => {79					for NixBuildTask(v, o) in deps {80						let _ = o.send(v.build().await);81					}82				}83				Err(e) => {84					for NixBuildTask(v, o) in deps {85						let s = v.to_string_weak().await.expect("drv is string-like");86						if PathBuf::from(s).exists() {87							let _ = o.send(v.build().await);88						} else {89							let _ = o.send(Err(e.clone()));90						}91					}92				}93			};94		});95		Self { tx }96	}97	pub async fn submit(self, task: Value) -> Result<HashMap<String, PathBuf>> {98		let Self { tx: task_tx } = self;99		let (tx, rx) = oneshot::channel();100		let _ = task_tx.send(NixBuildTask(task, tx));101		drop(task_tx);102		rx.await.expect("shoudn't be cancelled here")103	}104}105106impl NixSession {107	fn ptr_eq(a: &Self, b: &Self) -> bool {108		Arc::ptr_eq(&a.0, &b.0)109	}110111	pub fn new_build_batch(&self, name: String) -> NixBuildBatch {112		NixBuildBatch::new(name, self.clone())113	}114}115116pub fn init_tokio() {117	let _ = pool::TOKIO_RUNTIME.set(tokio::runtime::Handle::current());118}
after · crates/nix-eval/src/lib.rs
1//! This whole library should be replaced with either binding to nix libexpr,2//! or with tvix (once it is able to build NixOS).3//!4//! Current api is awful, little effort was put into this implementation.56use std::{collections::HashMap, path::PathBuf, sync::Arc};78pub use pool::NixSessionPool;9use pool::NixSessionPoolInner;10use r2d2::PooledConnection;11pub use session::{Error, Result};12use tokio::sync::{mpsc, oneshot};13use tracing::instrument;14pub use value::{Index, Value};1516mod pool;17mod session;18mod value;19// Contains macros helpers20#[doc(hidden)]21pub mod macros;22pub mod util;23// #[allow(non_upper_case_globals, non_camel_case_types, non_snake_case)]24// mod nix_raw {25// 	include!(concat!(env!("OUT_DIR"), "/bindings.rs"));26// }2728// fn init() {29// 	nix_raw::libutil_init();30// }3132#[derive(Clone)]33pub struct NixSession(pub(crate) Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);3435struct NixBuildTask(Value, oneshot::Sender<Result<HashMap<String, PathBuf>>>);3637#[derive(Clone)]38pub struct NixBuildBatch {39	tx: mpsc::UnboundedSender<NixBuildTask>,40}4142#[instrument(skip(session, values))]43async fn build_multiple(name: String, session: NixSession, values: Vec<Value>) -> Result<()> {44	let builtins = Value::binding(session, "builtins").await?;45	let system = nix_go!(builtins.currentSystem);46	let drv = nix_go!(builtins.derivation(Obj {47		system,48		name,49		builder: "/bin/sh",50		// we want nothing from this derivation, it is only used to perform multiple builds at once.51		args: vec!["-c", "echo > $out"],52		preferLocalBuild: true,53		allowSubstitutes: false,54		buildInputs: values,55	}));56	drv.build().await?;57	Ok(())58}5960impl NixBuildBatch {61	fn new(name: String, session: NixSession) -> Self {62		let (tx, mut rx) = mpsc::unbounded_channel::<NixBuildTask>();6364		tokio::task::spawn(async move {65			let mut deps = vec![];66			let mut build_data = vec![];67			while let Some(task) = rx.recv().await {68				build_data.push(task.0.clone());69				deps.push(task);70			}71			if deps.is_empty() {72				return;73			}74			match build_multiple(name, session, build_data).await {75				Ok(_) => {76					for NixBuildTask(v, o) in deps {77						let _ = o.send(v.build().await);78					}79				}80				Err(e) => {81					for NixBuildTask(v, o) in deps {82						let s = v.to_string_weak().await.expect("drv is string-like");83						if PathBuf::from(s).exists() {84							let _ = o.send(v.build().await);85						} else {86							let _ = o.send(Err(e.clone()));87						}88					}89				}90			};91		});92		Self { tx }93	}94	pub async fn submit(self, task: Value) -> Result<HashMap<String, PathBuf>> {95		let Self { tx: task_tx } = self;96		let (tx, rx) = oneshot::channel();97		let _ = task_tx.send(NixBuildTask(task, tx));98		drop(task_tx);99		rx.await.expect("shoudn't be cancelled here")100	}101}102103impl NixSession {104	fn ptr_eq(a: &Self, b: &Self) -> bool {105		Arc::ptr_eq(&a.0, &b.0)106	}107108	pub fn new_build_batch(&self, name: String) -> NixBuildBatch {109		NixBuildBatch::new(name, self.clone())110	}111}112113pub fn init_tokio() {114	let _ = pool::TOKIO_RUNTIME.set(tokio::runtime::Handle::current());115}