difftreelog
feat do not use build batching for single-host jobs
in: trunk
4 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1820,6 +1820,7 @@
name = "nix-native-eval"
version = "0.1.0"
dependencies = [
+ "anyhow",
"nixrs",
]
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -272,31 +272,23 @@
impl BuildSystems {
pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
- let hosts = config.list_hosts().await?;
+ let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
let set = LocalSet::new();
let build_attr = self.build_attr.clone();
- for host in hosts.into_iter() {
- if opts.should_skip(&host).await? {
- continue;
- }
+ let batch = (hosts.len() > 1).then(|| {
+ config
+ .nix_session
+ .new_build_batch("build-hosts".to_string())
+ });
+ for host in hosts {
let config = config.clone();
let span = info_span!("build", host = field::display(&host.name));
let hostname = host.name;
let build_attr = build_attr.clone();
- // FIXME: Since the introduction of better-nix-eval,
- // due to single repl used for builds, hosts are waiting for each other to build,
- // instead of building concurrently.
- //
- // Open multiple repls?
- //
- // Create build batcher, which will behave similar to golangs
- // WaitGroup, and start executing once all the build tasks are scheduled?
- // This also allows to cleanup build output, as there will be no longer
- // "waiting for remote machine" messages in the cases when one package is needed for
- // multiple hosts.
+ let batch = batch.clone();
set.spawn_local(
(async move {
- let built = match build_task(config, hostname.clone(), &build_attr, None).await
+ let built = match build_task(config, hostname.clone(), &build_attr, batch).await
{
Ok(path) => path,
Err(e) => {
@@ -316,6 +308,7 @@
.instrument(span),
);
}
+ drop(batch);
set.await;
Ok(())
}
@@ -323,20 +316,21 @@
impl Deploy {
pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
- let hosts = config.list_hosts().await?;
+ let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
let set = LocalSet::new();
- let batch = Some(config.nix_session.new_build_batch(format!("deploy-hosts")));
+ let batch = (hosts.len() > 1).then(|| {
+ config
+ .nix_session
+ .new_build_batch("deploy-hosts".to_string())
+ });
for host in hosts.into_iter() {
- if opts.should_skip(&host).await? {
- continue;
- }
let config = config.clone();
let span = info_span!("deploy", host = field::display(&host.name));
let hostname = host.name.clone();
let local_host = config.local_host();
let opts = opts.clone();
let batch = batch.clone();
- // FIXME: Fix repl concurrency (see build-systems)
+
set.spawn_local(
(async move {
let built =
crates/fleet-base/src/opts.rsdiffbeforeafterboth1use std::{2 collections::BTreeMap,3 env::current_dir,4 ffi::OsString,5 str::FromStr,6 sync::{Arc, Mutex},7};89use anyhow::Result;10use clap::Parser;11use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSessionPool, Value};12use nom::{13 bytes::complete::take_while1,14 character::complete::char,15 combinator::{map, opt},16 multi::separated_list1,17 sequence::{preceded, separated_pair},18};1920use crate::{21 fleetdata::FleetData,22 host::{Config, ConfigHost, FleetConfigInternals},23};2425#[derive(Clone)]26pub enum HostItem {27 Host {28 name: String,29 attrs: BTreeMap<String, String>,30 },31 Tag {32 name: String,33 attrs: BTreeMap<String, String>,34 },35}36fn host_item_parser(input: &str) -> Result<HostItem, String> {37 fn err_to_string(err: nom::Err<nom::error::Error<&str>>) -> String {38 err.to_string()39 }4041 let (input, is_tag) = map(opt(char('@')), |c| c.is_some())(input).map_err(err_to_string)?;42 let (input, name) = map(43 take_while1(|v| v != ',' && v != '?' && v != '@'),44 str::to_owned,45 )(input)46 .map_err(err_to_string)?;4748 let kw_item = separated_pair(49 map(take_while1(|v| v != '&' && v != '='), str::to_owned),50 char('='),51 map(take_while1(|v| v != '&'), str::to_owned),52 );53 let kw = map(separated_list1(char('&'), kw_item), |vec| {54 vec.into_iter().collect::<BTreeMap<_, _>>()55 });56 let mut opt_kw = map(opt(preceded(char('?'), kw)), Option::unwrap_or_default);5758 let (input, attrs) = opt_kw(input).map_err(err_to_string)?;5960 if !input.is_empty() {61 return Err(format!("unexpected trailing input: {input:?}"));62 }63 Ok(if is_tag {64 HostItem::Tag { name, attrs }65 } else {66 HostItem::Host { name, attrs }67 })68}6970// TODO: Rename to HostSelector71#[derive(Parser, Clone)]72pub struct FleetOpts {73 /// All hosts except those would be skipped74 #[clap(long, number_of_values = 1, value_parser = host_item_parser)]75 pub only: Vec<HostItem>,7677 /// Hosts to skip78 #[clap(long, number_of_values = 1)]79 pub skip: Vec<String>,8081 /// Host, which should be threaten as current machine82 // TODO: Replace with connectivity refactor83 #[clap(long, default_value_t = hostname::get().expect("unknown hostname").to_str().expect("hostname is not utf-8").to_owned())]84 pub localhost: String,8586 /// Override detected system for host, to perform builds via87 /// binfmt-declared qemu instead of trying to crosscompile88 // TODO: Remove, as it is not used anymore.89 #[clap(long, default_value = "detect")]90 pub local_system: String,91}9293impl FleetOpts {94 pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {95 if self.skip.iter().any(|h| h as &str == host.name) {96 return Ok(true);97 }98 if self.only.is_empty() {99 return Ok(false);100 }101 let mut have_group_matches = false;102 for item in self.only.iter() {103 match item {104 HostItem::Host { name, .. } if *name == host.name => {105 return Ok(false);106 }107 HostItem::Tag { .. } => {108 have_group_matches = true;109 }110 _ => {}111 }112 }113 if have_group_matches {114 let host_tags = host.tags().await?;115 for item in self.only.iter() {116 match item {117 HostItem::Tag { name, .. } if host_tags.contains(name) => {118 return Ok(false);119 }120 _ => {}121 }122 }123 }124 Ok(true)125 }126 pub async fn action_attr<T: FromStr>(&self, host: &ConfigHost, attr: &str) -> Result<Option<T>>127 where128 T::Err: Sync,129 anyhow::Error: From<T::Err>,130 {131 let str = self.action_attr_str(host, attr).await?;132 Ok(str.map(|v| T::from_str(&v)).transpose()?)133 }134 pub async fn action_attr_str(&self, host: &ConfigHost, attr: &str) -> Result<Option<String>> {135 if self.only.is_empty() {136 return Ok(None);137 }138 let mut have_group_matches = false;139 for item in self.only.iter() {140 match item {141 HostItem::Host { name, attrs }142 if *name == host.name && attrs.contains_key(attr) =>143 {144 return Ok(attrs.get(attr).cloned());145 }146 HostItem::Tag { attrs, .. } if attrs.contains_key(attr) => {147 have_group_matches = true;148 }149 _ => {}150 }151 }152 if have_group_matches {153 let host_tags = host.tags().await?;154 for item in self.only.iter() {155 match item {156 HostItem::Tag { name, attrs }157 if host_tags.contains(name) && attrs.contains_key(attr) =>158 {159 return Ok(attrs.get(attr).cloned());160 }161 _ => {}162 }163 }164 }165 Ok(None)166 }167 pub fn is_local(&self, host: &str) -> bool {168 self.localhost == host169 }170171 // TODO: Config should be detached from opts.172 pub async fn build(&self, nix_args: Vec<OsString>) -> Result<Config> {173 let directory = current_dir()?;174175 let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;176 let nix_session = pool.get().await?;177178 let builtins_field = Value::binding(nix_session.clone(), "builtins").await?;179 let local_system = if self.local_system == "detect" {180 nix_go_json!(builtins_field.currentSystem)181 } else {182 self.local_system.clone()183 };184185 let mut fleet_data_path = directory.clone();186 fleet_data_path.push("fleet.nix");187 let bytes = std::fs::read_to_string(fleet_data_path)?;188 let data: Mutex<FleetData> = nixlike::parse_str(&bytes)?;189190 let fleet_root = Value::binding(nix_session.clone(), "fleetConfigurations").await?;191 let fleet_field = nix_go!(fleet_root.default({ data }));192193 let config_field = nix_go!(fleet_field.config);194195 assert_warn("fleet config evaluation", &config_field).await?;196197 let import = nix_go!(builtins_field.import);198 let overlays = nix_go!(config_field.nixpkgs.overlays);199 let nixpkgs = nix_go!(config_field.nixpkgs.buildUsing | import);200201 let default_pkgs = nix_go!(nixpkgs(Obj {202 overlays,203 system: local_system.clone(),204 }));205206 Ok(Config(Arc::new(FleetConfigInternals {207 nix_session,208 directory,209 data,210 local_system,211 nix_args,212 config_field,213 default_pkgs,214 localhost: self.localhost.to_owned(),215 })))216 }217}1use std::{2 collections::BTreeMap,3 env::current_dir,4 ffi::OsString,5 str::FromStr,6 sync::{Arc, Mutex},7};89use anyhow::Result;10use clap::Parser;11use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSessionPool, Value};12use nom::{13 bytes::complete::take_while1,14 character::complete::char,15 combinator::{map, opt},16 multi::separated_list1,17 sequence::{preceded, separated_pair},18};1920use crate::{21 fleetdata::FleetData,22 host::{Config, ConfigHost, FleetConfigInternals},23};2425#[derive(Clone)]26pub enum HostItem {27 Host {28 name: String,29 attrs: BTreeMap<String, String>,30 },31 Tag {32 name: String,33 attrs: BTreeMap<String, String>,34 },35}36fn host_item_parser(input: &str) -> Result<HostItem, String> {37 fn err_to_string(err: nom::Err<nom::error::Error<&str>>) -> String {38 err.to_string()39 }4041 let (input, is_tag) = map(opt(char('@')), |c| c.is_some())(input).map_err(err_to_string)?;42 let (input, name) = map(43 take_while1(|v| v != ',' && v != '?' && v != '@'),44 str::to_owned,45 )(input)46 .map_err(err_to_string)?;4748 let kw_item = separated_pair(49 map(take_while1(|v| v != '&' && v != '='), str::to_owned),50 char('='),51 map(take_while1(|v| v != '&'), str::to_owned),52 );53 let kw = map(separated_list1(char('&'), kw_item), |vec| {54 vec.into_iter().collect::<BTreeMap<_, _>>()55 });56 let mut opt_kw = map(opt(preceded(char('?'), kw)), Option::unwrap_or_default);5758 let (input, attrs) = opt_kw(input).map_err(err_to_string)?;5960 if !input.is_empty() {61 return Err(format!("unexpected trailing input: {input:?}"));62 }63 Ok(if is_tag {64 HostItem::Tag { name, attrs }65 } else {66 HostItem::Host { name, attrs }67 })68}6970// TODO: Rename to HostSelector71#[derive(Parser, Clone)]72pub struct FleetOpts {73 /// All hosts except those would be skipped74 #[clap(long, number_of_values = 1, value_parser = host_item_parser)]75 pub only: Vec<HostItem>,7677 /// Hosts to skip78 #[clap(long, number_of_values = 1)]79 pub skip: Vec<String>,8081 /// Host, which should be threaten as current machine82 // TODO: Replace with connectivity refactor83 #[clap(long, default_value_t = hostname::get().expect("unknown hostname").to_str().expect("hostname is not utf-8").to_owned())]84 pub localhost: String,8586 /// Override detected system for host, to perform builds via87 /// binfmt-declared qemu instead of trying to crosscompile88 // TODO: Remove, as it is not used anymore.89 #[clap(long, default_value = "detect")]90 pub local_system: String,91}9293impl FleetOpts {94 pub async fn filter_skipped(&self, hosts: impl IntoIterator<Item = ConfigHost>) -> Result<Vec<ConfigHost>> {95 let mut out = Vec::new(); 96 for host in hosts {97 if self.should_skip(&host).await? {98 continue;99 }100 out.push(host);101 }102 Ok(out)103 }104 pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {105 if self.skip.iter().any(|h| h as &str == host.name) {106 return Ok(true);107 }108 if self.only.is_empty() {109 return Ok(false);110 }111 let mut have_group_matches = false;112 for item in self.only.iter() {113 match item {114 HostItem::Host { name, .. } if *name == host.name => {115 return Ok(false);116 }117 HostItem::Tag { .. } => {118 have_group_matches = true;119 }120 _ => {}121 }122 }123 if have_group_matches {124 let host_tags = host.tags().await?;125 for item in self.only.iter() {126 match item {127 HostItem::Tag { name, .. } if host_tags.contains(name) => {128 return Ok(false);129 }130 _ => {}131 }132 }133 }134 Ok(true)135 }136 pub async fn action_attr<T: FromStr>(&self, host: &ConfigHost, attr: &str) -> Result<Option<T>>137 where138 T::Err: Sync,139 anyhow::Error: From<T::Err>,140 {141 let str = self.action_attr_str(host, attr).await?;142 Ok(str.map(|v| T::from_str(&v)).transpose()?)143 }144 pub async fn action_attr_str(&self, host: &ConfigHost, attr: &str) -> Result<Option<String>> {145 if self.only.is_empty() {146 return Ok(None);147 }148 let mut have_group_matches = false;149 for item in self.only.iter() {150 match item {151 HostItem::Host { name, attrs }152 if *name == host.name && attrs.contains_key(attr) =>153 {154 return Ok(attrs.get(attr).cloned());155 }156 HostItem::Tag { attrs, .. } if attrs.contains_key(attr) => {157 have_group_matches = true;158 }159 _ => {}160 }161 }162 if have_group_matches {163 let host_tags = host.tags().await?;164 for item in self.only.iter() {165 match item {166 HostItem::Tag { name, attrs }167 if host_tags.contains(name) && attrs.contains_key(attr) =>168 {169 return Ok(attrs.get(attr).cloned());170 }171 _ => {}172 }173 }174 }175 Ok(None)176 }177 pub fn is_local(&self, host: &str) -> bool {178 self.localhost == host179 }180181 // TODO: Config should be detached from opts.182 pub async fn build(&self, nix_args: Vec<OsString>) -> Result<Config> {183 let directory = current_dir()?;184185 let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;186 let nix_session = pool.get().await?;187188 let builtins_field = Value::binding(nix_session.clone(), "builtins").await?;189 let local_system = if self.local_system == "detect" {190 nix_go_json!(builtins_field.currentSystem)191 } else {192 self.local_system.clone()193 };194195 let mut fleet_data_path = directory.clone();196 fleet_data_path.push("fleet.nix");197 let bytes = std::fs::read_to_string(fleet_data_path)?;198 let data: Mutex<FleetData> = nixlike::parse_str(&bytes)?;199200 let fleet_root = Value::binding(nix_session.clone(), "fleetConfigurations").await?;201 let fleet_field = nix_go!(fleet_root.default({ data }));202203 let config_field = nix_go!(fleet_field.config);204205 assert_warn("fleet config evaluation", &config_field).await?;206207 let import = nix_go!(builtins_field.import);208 let overlays = nix_go!(config_field.nixpkgs.overlays);209 let nixpkgs = nix_go!(config_field.nixpkgs.buildUsing | import);210211 let default_pkgs = nix_go!(nixpkgs(Obj {212 overlays,213 system: local_system.clone(),214 }));215216 Ok(Config(Arc::new(FleetConfigInternals {217 nix_session,218 directory,219 data,220 local_system,221 nix_args,222 config_field,223 default_pkgs,224 localhost: self.localhost.to_owned(),225 })))226 }227}crates/nix-eval/src/lib.rsdiffbeforeafterboth--- a/crates/nix-eval/src/lib.rs
+++ b/crates/nix-eval/src/lib.rs
@@ -9,11 +9,8 @@
use pool::NixSessionPoolInner;
use r2d2::PooledConnection;
pub use session::{Error, Result};
-use tokio::{
- sync::{mpsc, oneshot},
- task::AbortHandle,
-};
-use tracing::{info, instrument, Instrument};
+use tokio::sync::{mpsc, oneshot};
+use tracing::instrument;
pub use value::{Index, Value};
mod pool;