git.delta.rocks / jrsonnet / refs/commits / e7a5b5f940a6

difftreelog

feat do not use build batching for single-host jobs

Yaroslav Bolyukin2024-11-14parent: #45f5af0.patch.diff
in: trunk

4 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1820,6 +1820,7 @@
 name = "nix-native-eval"
 version = "0.1.0"
 dependencies = [
+ "anyhow",
  "nixrs",
 ]
 
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -272,31 +272,23 @@
 
 impl BuildSystems {
 	pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
-		let hosts = config.list_hosts().await?;
+		let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
 		let set = LocalSet::new();
 		let build_attr = self.build_attr.clone();
-		for host in hosts.into_iter() {
-			if opts.should_skip(&host).await? {
-				continue;
-			}
+		let batch = (hosts.len() > 1).then(|| {
+			config
+				.nix_session
+				.new_build_batch("build-hosts".to_string())
+		});
+		for host in hosts {
 			let config = config.clone();
 			let span = info_span!("build", host = field::display(&host.name));
 			let hostname = host.name;
 			let build_attr = build_attr.clone();
-			// FIXME: Since the introduction of better-nix-eval,
-			// due to single repl used for builds, hosts are waiting for each other to build,
-			// instead of building concurrently.
-			//
-			// Open multiple repls?
-			//
-			// Create build batcher, which will behave similar to golangs
-			// WaitGroup, and start executing once all the build tasks are scheduled?
-			// This also allows to cleanup build output, as there will be no longer
-			// "waiting for remote machine" messages in the cases when one package is needed for
-			// multiple hosts.
+			let batch = batch.clone();
 			set.spawn_local(
 				(async move {
-					let built = match build_task(config, hostname.clone(), &build_attr, None).await
+					let built = match build_task(config, hostname.clone(), &build_attr, batch).await
 					{
 						Ok(path) => path,
 						Err(e) => {
@@ -316,6 +308,7 @@
 				.instrument(span),
 			);
 		}
+		drop(batch);
 		set.await;
 		Ok(())
 	}
@@ -323,20 +316,21 @@
 
 impl Deploy {
 	pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
-		let hosts = config.list_hosts().await?;
+		let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
 		let set = LocalSet::new();
-		let batch = Some(config.nix_session.new_build_batch(format!("deploy-hosts")));
+		let batch = (hosts.len() > 1).then(|| {
+			config
+				.nix_session
+				.new_build_batch("deploy-hosts".to_string())
+		});
 		for host in hosts.into_iter() {
-			if opts.should_skip(&host).await? {
-				continue;
-			}
 			let config = config.clone();
 			let span = info_span!("deploy", host = field::display(&host.name));
 			let hostname = host.name.clone();
 			let local_host = config.local_host();
 			let opts = opts.clone();
 			let batch = batch.clone();
-			// FIXME: Fix repl concurrency (see build-systems)
+
 			set.spawn_local(
 				(async move {
 					let built =
modifiedcrates/fleet-base/src/opts.rsdiffbeforeafterboth
before · crates/fleet-base/src/opts.rs
1use std::{2	collections::BTreeMap,3	env::current_dir,4	ffi::OsString,5	str::FromStr,6	sync::{Arc, Mutex},7};89use anyhow::Result;10use clap::Parser;11use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSessionPool, Value};12use nom::{13	bytes::complete::take_while1,14	character::complete::char,15	combinator::{map, opt},16	multi::separated_list1,17	sequence::{preceded, separated_pair},18};1920use crate::{21	fleetdata::FleetData,22	host::{Config, ConfigHost, FleetConfigInternals},23};2425#[derive(Clone)]26pub enum HostItem {27	Host {28		name: String,29		attrs: BTreeMap<String, String>,30	},31	Tag {32		name: String,33		attrs: BTreeMap<String, String>,34	},35}36fn host_item_parser(input: &str) -> Result<HostItem, String> {37	fn err_to_string(err: nom::Err<nom::error::Error<&str>>) -> String {38		err.to_string()39	}4041	let (input, is_tag) = map(opt(char('@')), |c| c.is_some())(input).map_err(err_to_string)?;42	let (input, name) = map(43		take_while1(|v| v != ',' && v != '?' && v != '@'),44		str::to_owned,45	)(input)46	.map_err(err_to_string)?;4748	let kw_item = separated_pair(49		map(take_while1(|v| v != '&' && v != '='), str::to_owned),50		char('='),51		map(take_while1(|v| v != '&'), str::to_owned),52	);53	let kw = map(separated_list1(char('&'), kw_item), |vec| {54		vec.into_iter().collect::<BTreeMap<_, _>>()55	});56	let mut opt_kw = map(opt(preceded(char('?'), kw)), Option::unwrap_or_default);5758	let (input, attrs) = opt_kw(input).map_err(err_to_string)?;5960	if !input.is_empty() {61		return Err(format!("unexpected trailing input: {input:?}"));62	}63	Ok(if is_tag {64		HostItem::Tag { name, attrs }65	} else {66		HostItem::Host { name, attrs }67	})68}6970// TODO: Rename to HostSelector71#[derive(Parser, Clone)]72pub struct FleetOpts {73	/// All hosts except those would be skipped74	#[clap(long, number_of_values = 1, value_parser = host_item_parser)]75	pub only: Vec<HostItem>,7677	/// Hosts to skip78	#[clap(long, number_of_values = 1)]79	pub skip: Vec<String>,8081	/// Host, which should be threaten as current machine82	// TODO: Replace with connectivity refactor83	#[clap(long, default_value_t = hostname::get().expect("unknown hostname").to_str().expect("hostname is not utf-8").to_owned())]84	pub localhost: String,8586	/// Override detected system for host, to perform builds via87	/// binfmt-declared qemu instead of trying to crosscompile88	// TODO: Remove, as it is not used anymore.89	#[clap(long, default_value = "detect")]90	pub local_system: String,91}9293impl FleetOpts {94	pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {95		if self.skip.iter().any(|h| h as &str == host.name) {96			return Ok(true);97		}98		if self.only.is_empty() {99			return Ok(false);100		}101		let mut have_group_matches = false;102		for item in self.only.iter() {103			match item {104				HostItem::Host { name, .. } if *name == host.name => {105					return Ok(false);106				}107				HostItem::Tag { .. } => {108					have_group_matches = true;109				}110				_ => {}111			}112		}113		if have_group_matches {114			let host_tags = host.tags().await?;115			for item in self.only.iter() {116				match item {117					HostItem::Tag { name, .. } if host_tags.contains(name) => {118						return Ok(false);119					}120					_ => {}121				}122			}123		}124		Ok(true)125	}126	pub async fn action_attr<T: FromStr>(&self, host: &ConfigHost, attr: &str) -> Result<Option<T>>127	where128		T::Err: Sync,129		anyhow::Error: From<T::Err>,130	{131		let str = self.action_attr_str(host, attr).await?;132		Ok(str.map(|v| T::from_str(&v)).transpose()?)133	}134	pub async fn action_attr_str(&self, host: &ConfigHost, attr: &str) -> Result<Option<String>> {135		if self.only.is_empty() {136			return Ok(None);137		}138		let mut have_group_matches = false;139		for item in self.only.iter() {140			match item {141				HostItem::Host { name, attrs }142					if *name == host.name && attrs.contains_key(attr) =>143				{144					return Ok(attrs.get(attr).cloned());145				}146				HostItem::Tag { attrs, .. } if attrs.contains_key(attr) => {147					have_group_matches = true;148				}149				_ => {}150			}151		}152		if have_group_matches {153			let host_tags = host.tags().await?;154			for item in self.only.iter() {155				match item {156					HostItem::Tag { name, attrs }157						if host_tags.contains(name) && attrs.contains_key(attr) =>158					{159						return Ok(attrs.get(attr).cloned());160					}161					_ => {}162				}163			}164		}165		Ok(None)166	}167	pub fn is_local(&self, host: &str) -> bool {168		self.localhost == host169	}170171	// TODO: Config should be detached from opts.172	pub async fn build(&self, nix_args: Vec<OsString>) -> Result<Config> {173		let directory = current_dir()?;174175		let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;176		let nix_session = pool.get().await?;177178		let builtins_field = Value::binding(nix_session.clone(), "builtins").await?;179		let local_system = if self.local_system == "detect" {180			nix_go_json!(builtins_field.currentSystem)181		} else {182			self.local_system.clone()183		};184185		let mut fleet_data_path = directory.clone();186		fleet_data_path.push("fleet.nix");187		let bytes = std::fs::read_to_string(fleet_data_path)?;188		let data: Mutex<FleetData> = nixlike::parse_str(&bytes)?;189190		let fleet_root = Value::binding(nix_session.clone(), "fleetConfigurations").await?;191		let fleet_field = nix_go!(fleet_root.default({ data }));192193		let config_field = nix_go!(fleet_field.config);194195		assert_warn("fleet config evaluation", &config_field).await?;196197		let import = nix_go!(builtins_field.import);198		let overlays = nix_go!(config_field.nixpkgs.overlays);199		let nixpkgs = nix_go!(config_field.nixpkgs.buildUsing | import);200201		let default_pkgs = nix_go!(nixpkgs(Obj {202			overlays,203			system: local_system.clone(),204		}));205206		Ok(Config(Arc::new(FleetConfigInternals {207			nix_session,208			directory,209			data,210			local_system,211			nix_args,212			config_field,213			default_pkgs,214			localhost: self.localhost.to_owned(),215		})))216	}217}
after · crates/fleet-base/src/opts.rs
1use std::{2	collections::BTreeMap,3	env::current_dir,4	ffi::OsString,5	str::FromStr,6	sync::{Arc, Mutex},7};89use anyhow::Result;10use clap::Parser;11use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSessionPool, Value};12use nom::{13	bytes::complete::take_while1,14	character::complete::char,15	combinator::{map, opt},16	multi::separated_list1,17	sequence::{preceded, separated_pair},18};1920use crate::{21	fleetdata::FleetData,22	host::{Config, ConfigHost, FleetConfigInternals},23};2425#[derive(Clone)]26pub enum HostItem {27	Host {28		name: String,29		attrs: BTreeMap<String, String>,30	},31	Tag {32		name: String,33		attrs: BTreeMap<String, String>,34	},35}36fn host_item_parser(input: &str) -> Result<HostItem, String> {37	fn err_to_string(err: nom::Err<nom::error::Error<&str>>) -> String {38		err.to_string()39	}4041	let (input, is_tag) = map(opt(char('@')), |c| c.is_some())(input).map_err(err_to_string)?;42	let (input, name) = map(43		take_while1(|v| v != ',' && v != '?' && v != '@'),44		str::to_owned,45	)(input)46	.map_err(err_to_string)?;4748	let kw_item = separated_pair(49		map(take_while1(|v| v != '&' && v != '='), str::to_owned),50		char('='),51		map(take_while1(|v| v != '&'), str::to_owned),52	);53	let kw = map(separated_list1(char('&'), kw_item), |vec| {54		vec.into_iter().collect::<BTreeMap<_, _>>()55	});56	let mut opt_kw = map(opt(preceded(char('?'), kw)), Option::unwrap_or_default);5758	let (input, attrs) = opt_kw(input).map_err(err_to_string)?;5960	if !input.is_empty() {61		return Err(format!("unexpected trailing input: {input:?}"));62	}63	Ok(if is_tag {64		HostItem::Tag { name, attrs }65	} else {66		HostItem::Host { name, attrs }67	})68}6970// TODO: Rename to HostSelector71#[derive(Parser, Clone)]72pub struct FleetOpts {73	/// All hosts except those would be skipped74	#[clap(long, number_of_values = 1, value_parser = host_item_parser)]75	pub only: Vec<HostItem>,7677	/// Hosts to skip78	#[clap(long, number_of_values = 1)]79	pub skip: Vec<String>,8081	/// Host, which should be threaten as current machine82	// TODO: Replace with connectivity refactor83	#[clap(long, default_value_t = hostname::get().expect("unknown hostname").to_str().expect("hostname is not utf-8").to_owned())]84	pub localhost: String,8586	/// Override detected system for host, to perform builds via87	/// binfmt-declared qemu instead of trying to crosscompile88	// TODO: Remove, as it is not used anymore.89	#[clap(long, default_value = "detect")]90	pub local_system: String,91}9293impl FleetOpts {94	pub async fn filter_skipped(&self, hosts: impl IntoIterator<Item = ConfigHost>) -> Result<Vec<ConfigHost>> {95		let mut out = Vec::new();	96		for host in hosts {97			if self.should_skip(&host).await? {98				continue;99			}100			out.push(host);101		}102		Ok(out)103	}104	pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {105		if self.skip.iter().any(|h| h as &str == host.name) {106			return Ok(true);107		}108		if self.only.is_empty() {109			return Ok(false);110		}111		let mut have_group_matches = false;112		for item in self.only.iter() {113			match item {114				HostItem::Host { name, .. } if *name == host.name => {115					return Ok(false);116				}117				HostItem::Tag { .. } => {118					have_group_matches = true;119				}120				_ => {}121			}122		}123		if have_group_matches {124			let host_tags = host.tags().await?;125			for item in self.only.iter() {126				match item {127					HostItem::Tag { name, .. } if host_tags.contains(name) => {128						return Ok(false);129					}130					_ => {}131				}132			}133		}134		Ok(true)135	}136	pub async fn action_attr<T: FromStr>(&self, host: &ConfigHost, attr: &str) -> Result<Option<T>>137	where138		T::Err: Sync,139		anyhow::Error: From<T::Err>,140	{141		let str = self.action_attr_str(host, attr).await?;142		Ok(str.map(|v| T::from_str(&v)).transpose()?)143	}144	pub async fn action_attr_str(&self, host: &ConfigHost, attr: &str) -> Result<Option<String>> {145		if self.only.is_empty() {146			return Ok(None);147		}148		let mut have_group_matches = false;149		for item in self.only.iter() {150			match item {151				HostItem::Host { name, attrs }152					if *name == host.name && attrs.contains_key(attr) =>153				{154					return Ok(attrs.get(attr).cloned());155				}156				HostItem::Tag { attrs, .. } if attrs.contains_key(attr) => {157					have_group_matches = true;158				}159				_ => {}160			}161		}162		if have_group_matches {163			let host_tags = host.tags().await?;164			for item in self.only.iter() {165				match item {166					HostItem::Tag { name, attrs }167						if host_tags.contains(name) && attrs.contains_key(attr) =>168					{169						return Ok(attrs.get(attr).cloned());170					}171					_ => {}172				}173			}174		}175		Ok(None)176	}177	pub fn is_local(&self, host: &str) -> bool {178		self.localhost == host179	}180181	// TODO: Config should be detached from opts.182	pub async fn build(&self, nix_args: Vec<OsString>) -> Result<Config> {183		let directory = current_dir()?;184185		let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;186		let nix_session = pool.get().await?;187188		let builtins_field = Value::binding(nix_session.clone(), "builtins").await?;189		let local_system = if self.local_system == "detect" {190			nix_go_json!(builtins_field.currentSystem)191		} else {192			self.local_system.clone()193		};194195		let mut fleet_data_path = directory.clone();196		fleet_data_path.push("fleet.nix");197		let bytes = std::fs::read_to_string(fleet_data_path)?;198		let data: Mutex<FleetData> = nixlike::parse_str(&bytes)?;199200		let fleet_root = Value::binding(nix_session.clone(), "fleetConfigurations").await?;201		let fleet_field = nix_go!(fleet_root.default({ data }));202203		let config_field = nix_go!(fleet_field.config);204205		assert_warn("fleet config evaluation", &config_field).await?;206207		let import = nix_go!(builtins_field.import);208		let overlays = nix_go!(config_field.nixpkgs.overlays);209		let nixpkgs = nix_go!(config_field.nixpkgs.buildUsing | import);210211		let default_pkgs = nix_go!(nixpkgs(Obj {212			overlays,213			system: local_system.clone(),214		}));215216		Ok(Config(Arc::new(FleetConfigInternals {217			nix_session,218			directory,219			data,220			local_system,221			nix_args,222			config_field,223			default_pkgs,224			localhost: self.localhost.to_owned(),225		})))226	}227}
modifiedcrates/nix-eval/src/lib.rsdiffbeforeafterboth
--- a/crates/nix-eval/src/lib.rs
+++ b/crates/nix-eval/src/lib.rs
@@ -9,11 +9,8 @@
 use pool::NixSessionPoolInner;
 use r2d2::PooledConnection;
 pub use session::{Error, Result};
-use tokio::{
-	sync::{mpsc, oneshot},
-	task::AbortHandle,
-};
-use tracing::{info, instrument, Instrument};
+use tokio::sync::{mpsc, oneshot};
+use tracing::instrument;
 pub use value::{Index, Value};
 
 mod pool;