git.delta.rocks / jrsonnet / refs/commits / 81e3c77f96ff

difftreelog

feat parallel host builds

Yaroslav Bolyukin2021-12-26parent: #6a5196a.patch.diff
in: trunk

6 files changed

modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,6 +1,6 @@
 use std::{env::current_dir, time::Duration};
 
-use crate::{command::CommandExt, host::Config, nix::SYSTEMS_ATTRIBUTE};
+use crate::{command::CommandExt, host::Config};
 use anyhow::Result;
 use structopt::StructOpt;
 use tokio::{process::Command, task::LocalSet, time::sleep};
@@ -21,6 +21,8 @@
 	privileged_build: bool,
 	#[structopt(subcommand)]
 	subcommand: Subcommand,
+	#[structopt(long)]
+	show_trace: bool,
 }
 
 enum UploadAction {
@@ -79,6 +81,137 @@
 }
 
 impl BuildSystems {
+	async fn build_task(self, config: Config, host: String) -> Result<()> {
+		info!("building");
+		let built = {
+			let dir = tempfile::tempdir()?;
+			dir.path().to_owned()
+		};
+
+		let mut nix_build = if self.privileged_build {
+			let mut out = Command::new("sudo");
+			out.arg("nix");
+			out
+		} else {
+			Command::new("nix")
+		};
+		nix_build
+			.args(&[
+				"build",
+				"--impure",
+				"--json",
+				// "--show-trace",
+				"--no-link",
+				"--out-link",
+			])
+			.arg(&built)
+			.arg(config.configuration_attr_name(&format!(
+				"configuredSystems.{}.config.system.build.toplevel",
+				host
+			)));
+
+		if self.show_trace {
+			nix_build.arg("--show-trace");
+		}
+		if let Some(builders) = &self.builders {
+			nix_build.arg("--builders").arg(builders);
+		}
+		if let Some(jobs) = &self.jobs {
+			nix_build.arg("--max-jobs");
+			nix_build.arg(format!("{}", jobs));
+		}
+		if !self.fail_fast {
+			nix_build.arg("--keep-going");
+		}
+
+		nix_build.run_nix().await?;
+		let built = std::fs::canonicalize(built)?;
+
+		let action = Action::from(self.subcommand.clone());
+
+		match action {
+			Action::Upload(action) => {
+				if !config.is_local(&host) {
+					info!("uploading system closure");
+					let mut tries = 0;
+					loop {
+						match Command::new("nix")
+							.args(&["copy", "--to"])
+							.arg(format!("ssh://root@{}", host))
+							.arg(&built)
+							.inherit_stdio()
+							.run_nix()
+							.await
+						{
+							Ok(()) => break,
+							Err(e) if tries < 3 => {
+								tries += 1;
+								warn!("Copy failure ({}/3): {}", tries, e);
+								sleep(Duration::from_millis(5000)).await;
+							}
+							Err(e) => return Err(e),
+						}
+					}
+				}
+				if let Some(action) = action {
+					if action.should_switch_profile() {
+						info!("switching generation");
+						config
+							.command_on(&host, "nix-env", true)
+							.args(&["-p", "/nix/var/nix/profiles/system", "--set"])
+							.arg(&built)
+							.inherit_stdio()
+							.run()
+							.await?;
+					}
+					info!("executing activation script");
+					let mut switch_script = built.clone();
+					switch_script.push("bin");
+					switch_script.push("switch-to-configuration");
+					config
+						.command_on(&host, switch_script, true)
+						.arg(action.name())
+						.inherit_stdio()
+						.run()
+						.await?;
+				}
+			}
+			Action::Package(PackageAction::SdImage) => {
+				let mut out = current_dir()?;
+				out.push(format!("sd-image-{}", host));
+
+				info!("building sd image to {:?}", out);
+				let mut nix_build = if self.privileged_build {
+					let mut out = Command::new("sudo");
+					out.arg("nix");
+					out
+				} else {
+					Command::new("nix")
+				};
+				nix_build
+					.args(&["build", "--impure", "--no-link", "--out-link"])
+					.arg(&out)
+					.arg(config.configuration_attr_name(&format!(
+						"configuredSystems.{}.config.system.build.sdImage",
+						host,
+					)));
+				if let Some(builders) = &self.builders {
+					nix_build.arg("--builders").arg(builders);
+				}
+				if let Some(jobs) = &self.jobs {
+					nix_build.arg("--max-jobs");
+					nix_build.arg(format!("{}", jobs));
+				}
+				if !self.fail_fast {
+					nix_build.arg("--keep-going");
+				}
+
+				nix_build.inherit_stdio().run_nix().await?;
+			}
+		};
+		Ok(())
+	}
+
 	pub async fn run(self, config: &Config) -> Result<()> {
 		let hosts = config.list_hosts().await?;
 		let set = LocalSet::new();
@@ -93,139 +226,12 @@
 			let span = info_span!("deployment", host = field::display(&host));
 			set.spawn_local(
 				(async move {
-					let res: Result<()> = try {
-						info!("building");
-						let built = {
-							let dir = tempfile::tempdir()?;
-							dir.path().to_owned()
-						};
-
-						let mut nix_build = if this.privileged_build {
-							let mut out = Command::new("sudo");
-							out.arg("nix");
-							out
-						} else {
-							Command::new("nix")
-						};
-						nix_build
-							.args(&[
-								"build",
-								"--impure",
-								"--json",
-								// "--show-trace",
-								"--no-link",
-								"--out-link",
-							])
-							.arg(&built)
-							.arg(format!(
-								"{}.{}.config.system.build.toplevel",
-								SYSTEMS_ATTRIBUTE, host,
-							));
-
-						if let Some(builders) = &this.builders {
-							nix_build.arg("--builders").arg(builders);
-						}
-						if let Some(jobs) = &this.jobs {
-							nix_build.arg("--max-jobs");
-							nix_build.arg(format!("{}", jobs));
-						}
-						if !this.fail_fast {
-							nix_build.arg("--keep-going");
-						}
-
-						nix_build.run_nix().await?;
-						let built = std::fs::canonicalize(built)?;
-
-						let action = Action::from(this.subcommand.clone());
-
-						match action {
-							Action::Upload(action) => {
-								if !config.is_local(&host) {
-									info!("uploading system closure");
-									let mut tries = 0;
-									loop {
-										match Command::new("nix")
-											.args(&["copy", "--to"])
-											.arg(format!("ssh://root@{}", host))
-											.arg(&built)
-											.inherit_stdio()
-											.run_nix()
-											.await
-										{
-											Ok(()) => break,
-											Err(e) if tries < 3 => {
-												tries += 1;
-												warn!("Copy failure ({}/3): {}", tries, e);
-												sleep(Duration::from_millis(5000)).await;
-											}
-											Err(e) => return Err(e),
-										}
-									}
-								}
-								if let Some(action) = action {
-									if action.should_switch_profile() {
-										info!("switching generation");
-										config
-											.command_on(&host, "nix-env", true)
-											.args(&["-p", "/nix/var/nix/profiles/system", "--set"])
-											.arg(&built)
-											.inherit_stdio()
-											.run()
-											.await?;
-									}
-									info!("executing activation script");
-									let mut switch_script = built.clone();
-									switch_script.push("bin");
-									switch_script.push("switch-to-configuration");
-									config
-										.command_on(&host, switch_script, true)
-										.arg(action.name())
-										.inherit_stdio()
-										.run()
-										.await?;
-								}
-							}
-							Action::Package(PackageAction::SdImage) => {
-								let mut out = current_dir()?;
-								out.push(format!("sd-image-{}", host));
-
-								info!("building sd image to {:?}", out);
-								let mut nix_build = if this.privileged_build {
-									let mut out = Command::new("sudo");
-									out.arg("nix");
-									out
-								} else {
-									Command::new("nix")
-								};
-								nix_build
-									.args(&["build", "--impure", "--no-link", "--out-link"])
-									.arg(&out)
-									.arg(format!(
-										"{}.{}.config.system.build.sdImage",
-										SYSTEMS_ATTRIBUTE, host,
-									));
-								if let Some(builders) = &this.builders {
-									nix_build.arg("--builders").arg(builders);
-								}
-								if let Some(jobs) = &this.jobs {
-									nix_build.arg("--max-jobs");
-									nix_build.arg(format!("{}", jobs));
-								}
-								if !this.fail_fast {
-									nix_build.arg("--keep-going");
-								}
-
-								nix_build.inherit_stdio().run_nix().await?;
-							}
-						};
-					};
-					match res {
+					match this.build_task(config, host).await {
 						Ok(_) => {}
 						Err(e) => {
 							error!("failed to deploy host: {}", e)
 						}
 					}
-					Ok(())
 				})
 				.instrument(span),
 			);
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
before · cmds/fleet/src/command.rs
1use std::{ffi::OsStr, process::Stdio};23use anyhow::{Context, Result};4use async_trait::async_trait;5use futures::StreamExt;6use serde::{7	de::{DeserializeOwned, Visitor},8	Deserialize, 9};10use tokio::{process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, warn};1314#[async_trait]15pub trait CommandExt {16	async fn run_nix(&mut self) -> Result<()>;17	async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;18	async fn run_nix_string(&mut self) -> Result<String>;19	async fn run(&mut self) -> Result<()>;20	async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;21	async fn run_string(&mut self) -> Result<String>;22	fn inherit_stdio(&mut self) -> &mut Self;23	fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self;24}2526#[derive(Debug)]27enum LogField {28	String(String),29	Num(u64),30}3132impl<'de> Deserialize<'de> for LogField {33	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>34	where35		D: serde::Deserializer<'de>,36	{37		struct StringOrNum;38		impl<'de> Visitor<'de> for StringOrNum {39			type Value = LogField;4041			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {42				write!(f, "string or unsigned")43			}4445			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>46			where47				E: serde::de::Error,48			{49				Ok(LogField::String(v.to_owned()))50			}5152			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>53			where54				E: serde::de::Error,55			{56				Ok(LogField::Num(v))57			}58		}5960		deserializer.deserialize_any(StringOrNum)61	}62}6364#[derive(Deserialize, Debug)]65#[serde(rename_all = "camelCase", tag = "action")]66enum NixLog {67	Msg {68		level: u32,69		msg: String,70		raw_msg: Option<String>,71	},72	Start {73		id: u64,74		level: u32,75		#[serde(default)]76		fields: Vec<LogField>,77		text: String,78		#[serde(rename = "type")]79		typ: u32,80	},81	Stop {82		id: u64,83	},84	Result {85		id: u64,86		#[serde(rename = "type")]87		typ: u32,88	},89}9091#[async_trait]92impl CommandExt for Command {93	async fn run_nix(&mut self) -> Result<()> {94		self.run_nix_string().await.map(|_| ())95	}96	async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T> {97		let str = self.run_nix_string().await?;98		serde_json::from_str(&str).with_context(|| format!("{:?}", str))99	}100101	async fn run_nix_string(&mut self) -> Result<String> {102		self.arg("--log-format").arg("internal-json");103		self.stderr(Stdio::piped());104		self.stdout(Stdio::piped());105		let mut child = self.spawn()?;106		let mut stderr = child.stderr.take().unwrap();107		let mut stdout = child.stdout.take().unwrap();108		let mut err = FramedRead::new(&mut stderr, LinesCodec::new());109		let mut out = FramedRead::new(&mut stdout, BytesCodec::new());110111		// while let Some(line) = read.next().await? {}112113		let mut out_buf = vec![];114		loop {115			select! {116				e = err.next() => {117					if let Some(e) = e {118						let e = e?;119						if let Some(e) = e.strip_prefix("@nix ") {120121							let log: NixLog = match serde_json::from_str(e) {122								Ok(l) => l,123								Err(err) => {124									warn!("failed to parse nix log line {:?}: {}", e, err);125									continue;126								},127							};128							match log {129								NixLog::Msg { msg, raw_msg, .. } => {130									if !(msg.ends_with(" is dirty") && msg.contains("warning:") && msg.contains(" Git tree ")) {131										info!(target: "nix", "{}", raw_msg.unwrap_or(msg))132									}133								},134								NixLog::Start { ref fields, typ, .. } if typ == 105 && fields.len() >= 1 => {135									if let [LogField::String(drv), ..] = &fields[..] {136										info!(target: "nix","building {}", drv)137									} else {138										warn!("bad build log: {:?}", log)139									}140								},141								NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {142									if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {143										info!(target: "nix","copying {} {} -> {}", drv, from, to)144									} else {145										warn!("bad copy log: {:?}", log)146									}147								},148								NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {149									if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {150										info!(target: "nix", "{}", text)151									}152								},153								NixLog::Stop { .. } => {},154								NixLog::Result { .. } => {},155								_ => warn!("unknown log: {:?}", log)156							};157						} else {158							warn!(target="nix","unknown: {}", e)159						}160					}161				},162				o = out.next() => {163					if let Some(o) = o {164						out_buf.extend_from_slice(&o?);165					}166				},167				code = child.wait() => {168					let code = code?;169					if !code.success() {170						anyhow::bail!("command ({:?}) failed with status {}", self, code);171					}172					break;173				}174			}175		}176177		Ok(String::from_utf8(out_buf)?)178	}179180	fn inherit_stdio(&mut self) -> &mut Self {181		self.stderr(Stdio::inherit());182		self183	}184185	async fn run(&mut self) -> Result<()> {186		self.inherit_stdio();187		let out = self.output().await?;188		if !out.status.success() {189			anyhow::bail!("command ({:?}) failed with status {}", self, out.status);190		}191		Ok(())192	}193194	async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T> {195		let str = self.run_string().await?;196		serde_json::from_str(&str).with_context(|| format!("{:?}", str))197	}198199	async fn run_string(&mut self) -> Result<String> {200		self.inherit_stdio();201		let out = self.output().await?;202		if !out.status.success() {203			anyhow::bail!("command ({:?}) failed with status {}", self, out.status);204		}205		Ok(String::from_utf8(out.stdout)?)206	}207208	fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self {209		let mut cmd = Command::new("ssh");210		cmd.arg(host).arg("--").arg(command);211		cmd212	}213}
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -16,6 +16,7 @@
 use crate::{command::CommandExt, fleetdata::FleetData};
 
 pub struct FleetConfigInternals {
+	pub local_system: String,
 	pub directory: PathBuf,
 	pub opts: FleetOpts,
 	pub data: RefCell<FleetData>,
@@ -66,17 +67,21 @@
 		}
 	}
 
-	pub fn full_attr_name(&self, attr_name: &str) -> OsString {
+	pub fn configuration_attr_name(&self, name: &str) -> OsString {
 		let mut str = self.directory.as_os_str().to_owned();
 		str.push("#");
-		str.push(attr_name);
+		str.push(&format!(
+			"fleetConfigurations.default.{}.{}",
+			self.local_system,
+			name
+		));
 		str
 	}
 
 	pub async fn list_hosts(&self) -> Result<Vec<String>> {
 		Command::new("nix")
 			.arg("eval")
-			.arg(self.full_attr_name("fleetConfigurations.default.configuredHosts"))
+			.arg(self.configuration_attr_name("configuredHosts"))
 			.args(&["--apply", "builtins.attrNames", "--json", "--show-trace"])
 			.run_nix_json()
 			.await
@@ -84,10 +89,7 @@
 	pub async fn config_attr<T: DeserializeOwned>(&self, host: &str, attr: &str) -> Result<T> {
 		Command::new("nix")
 			.arg("eval")
-			.arg(self.full_attr_name(&format!(
-				"fleetConfigurations.default.configuredSystems.{}.config.{}",
-				host, attr
-			)))
+			.arg(self.configuration_attr_name(&format!("configuredSystems.{}.config.{}", host, attr)))
 			.args(&["--json", "--show-trace"])
 			.run_nix_json()
 			.await
@@ -129,10 +131,14 @@
 	/// Host, which should be threaten as current machine
 	#[structopt(long)]
 	pub localhost: Option<String>,
+
+	#[structopt(long, default_value = "x86_64-linux")]
+	pub local_system: String,
 }
 
 impl FleetOpts {
 	pub fn build(mut self) -> Result<Config> {
+		let local_system = self.local_system.clone();
 		if self.localhost.is_none() {
 			self.localhost
 				.replace(hostname::get().unwrap().to_str().unwrap().to_owned());
@@ -148,6 +154,7 @@
 			opts: self,
 			directory,
 			data,
+			local_system,
 		})))
 	}
 }
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -1,11 +1,7 @@
-#![feature(try_blocks)]
-
 pub mod command;
 pub mod host;
 pub mod keys;
-
 pub mod cmds;
-pub mod nix;
 
 mod fleetdata;
 
deletedcmds/fleet/src/nix.rsdiffbeforeafterboth
--- a/cmds/fleet/src/nix.rs
+++ /dev/null
@@ -1,3 +0,0 @@
-pub const HOSTS_ATTRIBUTE: &str = ".#fleetConfigurations.default.configuredHosts";
-pub const SECRETS_ATTRIBUTE: &str = ".#fleetConfigurations.default.configuredSecrets";
-pub const SYSTEMS_ATTRIBUTE: &str = ".#fleetConfigurations.default.configuredSystems";
modifiedlib/default.nixdiffbeforeafterboth
--- a/lib/default.nix
+++ b/lib/default.nix
@@ -2,15 +2,16 @@
   fleetConfiguration = { data, nixpkgs, hosts, ... }@allConfig:
     let
       config = builtins.removeAttrs allConfig [ "nixpkgs" "data" ];
+      fleetLib = import ./fleetLib.nix {
+        inherit nixpkgs hosts;
+      };
     in
-    flake-utils.lib.eachDefaultSystem (system: rec {
+    nixpkgs.lib.genAttrs flake-utils.lib.defaultSystems (system: rec {
       root = nixpkgs.lib.evalModules {
         modules = (import ../modules/fleet/_modules.nix) ++ [ config data ];
         specialArgs = {
           inherit nixpkgs;
-          fleet = import ./fleetLib.nix {
-            inherit nixpkgs hosts;
-          };
+          fleet = fleetLib;
         };
       };
       configuredHosts = root.config.hosts;
@@ -27,12 +28,16 @@
                   else [ ]
                 ) ++ [
                   ({ ... }: {
+                    nixpkgs.system = system;
                     nixpkgs.localSystem.system = system;
                     nixpkgs.crossSystem = if system == configuredHosts.${name}.system then null else {
                       system = configuredHosts.${name}.system;
                     };
                   })
                 ];
+                specialArgs = {
+                  fleet = fleetLib.hostsToAttrs (host: configuredSystems.${host}.config);
+                };
               };
             }
           )