git.delta.rocks / jrsonnet / refs/commits / e7a5b5f940a6

difftreelog

feat do not use build batching for single-host jobs

Yaroslav Bolyukin2024-11-14parent: #45f5af0.patch.diff
in: trunk

4 files changed

modifiedCargo.lockdiffbeforeafterboth
1820name = "nix-native-eval"1820name = "nix-native-eval"
1821version = "0.1.0"1821version = "0.1.0"
1822dependencies = [1822dependencies = [
1823 "anyhow",
1823 "nixrs",1824 "nixrs",
1824]1825]
18251826
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
272272
273impl BuildSystems {273impl BuildSystems {
274 pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {274 pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
275 let hosts = config.list_hosts().await?;275 let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
276 let set = LocalSet::new();276 let set = LocalSet::new();
277 let build_attr = self.build_attr.clone();277 let build_attr = self.build_attr.clone();
278 let batch = (hosts.len() > 1).then(|| {
279 config
280 .nix_session
281 .new_build_batch("build-hosts".to_string())
282 });
278 for host in hosts.into_iter() {283 for host in hosts {
279 if opts.should_skip(&host).await? {
280 continue;
281 }
282 let config = config.clone();284 let config = config.clone();
283 let span = info_span!("build", host = field::display(&host.name));285 let span = info_span!("build", host = field::display(&host.name));
284 let hostname = host.name;286 let hostname = host.name;
285 let build_attr = build_attr.clone();287 let build_attr = build_attr.clone();
286 // FIXME: Since the introduction of better-nix-eval,288 let batch = batch.clone();
287 // due to single repl used for builds, hosts are waiting for each other to build,
288 // instead of building concurrently.
289 //
290 // Open multiple repls?
291 //
292 // Create build batcher, which will behave similar to golangs
293 // WaitGroup, and start executing once all the build tasks are scheduled?
294 // This also allows to cleanup build output, as there will be no longer
295 // "waiting for remote machine" messages in the cases when one package is needed for
296 // multiple hosts.
297 set.spawn_local(289 set.spawn_local(
298 (async move {290 (async move {
299 let built = match build_task(config, hostname.clone(), &build_attr, None).await291 let built = match build_task(config, hostname.clone(), &build_attr, batch).await
300 {292 {
301 Ok(path) => path,293 Ok(path) => path,
302 Err(e) => {294 Err(e) => {
316 .instrument(span),308 .instrument(span),
317 );309 );
318 }310 }
311 drop(batch);
319 set.await;312 set.await;
320 Ok(())313 Ok(())
321 }314 }
322}315}
323316
324impl Deploy {317impl Deploy {
325 pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {318 pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
326 let hosts = config.list_hosts().await?;319 let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
327 let set = LocalSet::new();320 let set = LocalSet::new();
328 let batch = Some(config.nix_session.new_build_batch(format!("deploy-hosts")));321 let batch = (hosts.len() > 1).then(|| {
322 config
323 .nix_session
324 .new_build_batch("deploy-hosts".to_string())
325 });
329 for host in hosts.into_iter() {326 for host in hosts.into_iter() {
330 if opts.should_skip(&host).await? {
331 continue;
332 }
333 let config = config.clone();327 let config = config.clone();
334 let span = info_span!("deploy", host = field::display(&host.name));328 let span = info_span!("deploy", host = field::display(&host.name));
335 let hostname = host.name.clone();329 let hostname = host.name.clone();
336 let local_host = config.local_host();330 let local_host = config.local_host();
337 let opts = opts.clone();331 let opts = opts.clone();
338 let batch = batch.clone();332 let batch = batch.clone();
339 // FIXME: Fix repl concurrency (see build-systems)333
340 set.spawn_local(334 set.spawn_local(
341 (async move {335 (async move {
342 let built =336 let built =
modifiedcrates/fleet-base/src/opts.rsdiffbeforeafterboth
91}91}
9292
93impl FleetOpts {93impl FleetOpts {
94 pub async fn filter_skipped(&self, hosts: impl IntoIterator<Item = ConfigHost>) -> Result<Vec<ConfigHost>> {
95 let mut out = Vec::new();
96 for host in hosts {
97 if self.should_skip(&host).await? {
98 continue;
99 }
100 out.push(host);
101 }
102 Ok(out)
103 }
94 pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {104 pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {
95 if self.skip.iter().any(|h| h as &str == host.name) {105 if self.skip.iter().any(|h| h as &str == host.name) {
96 return Ok(true);106 return Ok(true);
modifiedcrates/nix-eval/src/lib.rsdiffbeforeafterboth
9use pool::NixSessionPoolInner;9use pool::NixSessionPoolInner;
10use r2d2::PooledConnection;10use r2d2::PooledConnection;
11pub use session::{Error, Result};11pub use session::{Error, Result};
12use tokio::{12use tokio::sync::{mpsc, oneshot};
13 sync::{mpsc, oneshot},
14 task::AbortHandle,
15};
16use tracing::{info, instrument, Instrument};13use tracing::instrument;
17pub use value::{Index, Value};14pub use value::{Index, Value};
1815
19mod pool;16mod pool;