git.delta.rocks / jrsonnet / refs/commits / c0c9b96f77be

difftreelog

refactor split build-systems and deploy commands

Yaroslav Bolyukin2024-01-05parent: #718d88b.patch.diff
in: trunk

10 files changed

modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -5,3 +5,5 @@
 [workspace.dependencies]
 nixlike = { path = "./crates/nixlike" }
 better-command = { path = "./crates/better-command" }
+uuid = { version = "1.3.3", features = ["v4"] }
+tokio = { version = "1.33.0", features = ["fs", "rt", "macros", "sync", "time", "rt-multi-thread"] }
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -8,6 +8,7 @@
 [dependencies]
 nixlike.workspace = true
 better-command.workspace = true
+tokio.workspace = true
 anyhow = "1.0"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
@@ -27,7 +28,6 @@
 	"wrap_help",
 	"unicode",
 ] }
-tokio = { version = "1.33.0", features = ["full"] }
 tracing = "0.1"
 tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
 tokio-util = { version = "0.7.10", features = ["codec"] }
modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -428,6 +428,7 @@
 		self.used_fields.extend(e.used_fields);
 	}
 
+	#[allow(dead_code)]
 	pub fn session(&self) -> NixSession {
 		let mut session = None;
 		for ele in &self.used_fields {
@@ -444,6 +445,7 @@
 		}
 		session.expect("expr without fields used")
 	}
+	#[allow(dead_code)]
 	pub fn index_attr(&mut self, s: &str) {
 		let escaped = nixlike::serialize(s).expect("string");
 		self.out.push('.');
@@ -559,7 +561,9 @@
 pub enum Index {
 	Var(String),
 	String(String),
+	#[allow(dead_code)]
 	Apply(String),
+	#[allow(dead_code)]
 	Expr(NixExprBuilder),
 	ExprApply(NixExprBuilder),
 	Pipe(NixExprBuilder),
@@ -576,6 +580,7 @@
 	pub fn attr(v: impl AsRef<str>) -> Self {
 		Self::String(v.as_ref().to_owned())
 	}
+	#[allow(dead_code)]
 	pub fn apply(v: impl Serialize) -> Self {
 		let serialized = nixlike::serialize(v).expect("invalid value for apply");
 		Self::Apply(serialized.trim_end().to_owned())
@@ -749,6 +754,7 @@
 			.await
 			.with_context(|| context("as_json", self.0.full_path.as_deref(), &query))
 	}
+	#[allow(dead_code)]
 	pub async fn has_field(&self, name: &str) -> Result<bool> {
 		let id = self.0.value.expect("can't list root fields");
 		let key = nixlike::escape_string(name);
@@ -786,6 +792,7 @@
 			.await
 			.with_context(|| context("type_of", self.0.full_path.as_deref(), &query))
 	}
+	#[allow(dead_code)]
 	pub async fn import(&self) -> Result<Self> {
 		let import = Self::new(self.0.session.clone(), "import").await?;
 		Ok(nix_go!(self | import))
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
6use crate::host::{Config, ConfigHost};6use crate::host::{Config, ConfigHost};
7use crate::nix_go;7use crate::nix_go;
8use anyhow::{anyhow, Result};8use anyhow::{anyhow, Result};
9use clap::Parser;9use clap::{Parser, ValueEnum};
10use itertools::Itertools as _;10use itertools::Itertools as _;
11use tokio::{task::LocalSet, time::sleep};11use tokio::{task::LocalSet, time::sleep};
12use tracing::{error, field, info, info_span, warn, Instrument};12use tracing::{error, field, info, info_span, warn, Instrument};
1313
14#[derive(Parser, Clone)]14#[derive(Parser)]
15pub struct BuildSystems {15pub struct Deploy {
16 /// Disable automatic rollback16 /// Disable automatic rollback
17 #[clap(long)]17 #[clap(long)]
18 disable_rollback: bool,18 disable_rollback: bool,
19 #[clap(subcommand)]
20 subcommand: Subcommand,19 action: DeployAction,
21}20}
2221
22#[derive(ValueEnum, Clone, Copy)]
23enum UploadAction {23enum DeployAction {
24 /// Upload derivation, but do not execute the update.
25 Upload,
26 /// Upload and execute the activation script, old version will be used after reboot.
24 Test,27 Test,
28 /// Upload and set as current system profile, but do not execute activation script.
25 Boot,29 Boot,
30 /// Upload, set current profile, and execute activation script.
26 Switch,31 Switch,
27}32}
33
28impl UploadAction {34impl DeployAction {
29 fn name(&self) -> &'static str {35 pub(crate) fn name(&self) -> Option<&'static str> {
30 match self {36 match self {
37 DeployAction::Upload => None,
31 UploadAction::Test => "test",38 DeployAction::Test => Some("test"),
32 UploadAction::Boot => "boot",39 DeployAction::Boot => Some("boot"),
33 UploadAction::Switch => "switch",40 DeployAction::Switch => Some("switch"),
34 }41 }
35 }42 }
36
45 }51 }
46}52}
47
48enum PackageAction {
49 SdImage,
50 InstallationCd,
51}
52impl PackageAction {
53 fn build_attr(&self) -> String {
54 match self {
55 PackageAction::SdImage => "sdImage".to_owned(),
56 PackageAction::InstallationCd => "isoImage".to_owned(),
57 }
58 }
59}
60
61enum Action {
62 Upload { action: Option<UploadAction> },
63 Package(PackageAction),
64}
65impl Action {
66 fn build_attr(&self) -> String {
67 match self {
68 Action::Upload { .. } => "toplevel".to_owned(),
69 Action::Package(p) => p.build_attr(),
70 }
71 }
72}
73
74impl From<Subcommand> for Action {
75 fn from(s: Subcommand) -> Self {
76 match s {
77 Subcommand::Upload => Self::Upload { action: None },
78 Subcommand::Test => Self::Upload {
79 action: Some(UploadAction::Test),
80 },
81 Subcommand::Boot => Self::Upload {
82 action: Some(UploadAction::Boot),
83 },
84 Subcommand::Switch => Self::Upload {
85 action: Some(UploadAction::Switch),
86 },
87 Subcommand::SdImage => Self::Package(PackageAction::SdImage),
88 Subcommand::InstallationCd => Self::Package(PackageAction::InstallationCd),
89 }
90 }
91}
9253
93#[derive(Parser, Clone)]54#[derive(Parser, Clone)]
94enum Subcommand {55pub struct BuildSystems {
95 /// Upload, but do not switch56 /// Attribute to build. Systems are deployed from "toplevel" attr, well-known used attributes
96 Upload,57 /// are "sdImage"/"isoImage", and your configuration may include any other build attributes.
97 /// Upload + switch to built system until reboot58 #[clap(long, default_value = "toplevel")]
98 Test,59 build_attr: String,
99 /// Upload + switch to built system after reboot
100 Boot,
101 /// Upload + test + boot
102 Switch,
103
104 /// Build SD .img image
105 SdImage,
106 /// Build an installation cd ISO image
107 InstallationCd,
108}60}
10961
110struct Generation {62struct Generation {
163 Ok(current)115 Ok(current)
164}116}
165117
166async fn execute_upload(118async fn deploy_task(
167 build: &BuildSystems,
168 action: UploadAction,119 action: DeployAction,
169 host: &ConfigHost,120 host: &ConfigHost,
170 built: PathBuf,121 built: PathBuf,
122 disable_rollback: bool,
171) -> Result<()> {123) -> Result<()> {
172 let mut failed = false;124 let mut failed = false;
173 // TODO: Lockfile, to prevent concurrent system switch?125 // TODO: Lockfile, to prevent concurrent system switch?
174 // TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback126 // TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback
175 // is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to127 // is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
176 // unit name conflict in systemd-run128 // unit name conflict in systemd-run
177 // This code is tied to rollback.nix129 // This code is tied to rollback.nix
178 if !build.disable_rollback {130 if !disable_rollback {
179 let _span = info_span!("preparing").entered();131 let _span = info_span!("preparing").entered();
180 info!("preparing for rollback");132 info!("preparing for rollback");
181 let generation = get_current_generation(host).await?;133 let generation = get_current_generation(host).await?;
235 switch_script.push("bin");187 switch_script.push("bin");
236 switch_script.push("switch-to-configuration");188 switch_script.push("switch-to-configuration");
237 let mut cmd = host.cmd(switch_script).in_current_span().await?;189 let mut cmd = host.cmd(switch_script).in_current_span().await?;
238 cmd.arg(action.name());190 cmd.arg(action.name().expect("upload.should_activate == false"));
239 if let Err(e) = cmd.sudo().run().in_current_span().await {191 if let Err(e) = cmd.sudo().run().in_current_span().await {
240 error!("failed to activate: {e}");192 error!("failed to activate: {e}");
241 failed = true;193 failed = true;
242 }194 }
243 }195 }
244 if !build.disable_rollback {196 if !disable_rollback {
245 if failed {197 if failed {
246 info!("executing rollback");198 info!("executing rollback");
247 if let Err(e) = host199 if let Err(e) = host
280 Ok(())232 Ok(())
281}233}
234
235async fn build_task(config: Config, host: String, build_attr: &str) -> Result<PathBuf> {
236 info!("building");
237 let host = config.host(&host).await?;
238 // let action = Action::from(self.subcommand.clone());
239 let fleet_config = &config.config_field;
240 let drv = nix_go!(
241 fleet_config.hosts[{ &host.name }]
242 .nixosSystem
243 .config
244 .system
245 .build[{ build_attr }]
246 );
247 let outputs = drv.build().await.map_err(|e| {
248 if build_attr == "sdImage" {
249 info!("sd-image build failed");
250 info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");
251 }
252 e
253 })?;
254 let out_output = outputs
255 .get("out")
256 .ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;
257
258 Ok(out_output.clone())
259}
260
261impl BuildSystems {
262 pub async fn run(self, config: &Config) -> Result<()> {
263 let hosts = config.list_hosts().await?;
264 let set = LocalSet::new();
265 let build_attr = self.build_attr.clone();
266 for host in hosts.into_iter() {
267 if config.should_skip(&host.name) {
268 continue;
269 }
270 let config = config.clone();
271 let span = info_span!("build", host = field::display(&host.name));
272 let hostname = host.name;
273 let build_attr = build_attr.clone();
274 // FIXME: Since the introduction of better-nix-eval,
275 // due to single repl used for builds, hosts are waiting for each other to build,
276 // instead of building concurrently.
277 //
278 // Open multiple repls?
279 //
280 // Create build batcher, which will behave similar to golangs
281 // WaitGroup, and start executing once all the build tasks are scheduled?
282 // This also allows to cleanup build output, as there will be no longer
283 // "waiting for remote machine" messages in the cases when one package is needed for
284 // multiple hosts.
285 set.spawn_local(
286 (async move {
287 let built = match build_task(config, hostname.clone(), &build_attr).await {
288 Ok(path) => path,
289 Err(e) => {
290 error!("failed to deploy host: {}", e);
291 return;
292 }
293 };
294 // TODO: Handle error
295 let mut out = current_dir().expect("cwd exists");
296 out.push(format!("built-{}", hostname));
297
298 info!("linking iso image to {:?}", out);
299 if let Err(e) = symlink(built, out) {
300 error!("failed to symlink: {e}")
301 }
302 })
303 .instrument(span),
304 );
305 }
306 set.await;
307 Ok(())
308 }
309}
282310
283impl BuildSystems {311impl Deploy {
284 async fn build_task(self, config: Config, host: String) -> Result<()> {312 pub async fn run(self, config: &Config) -> Result<()> {
285 info!("building");
286 let host = config.host(&host).await?;313 let hosts = config.list_hosts().await?;
287 let action = Action::from(self.subcommand.clone());314 let set = LocalSet::new();
315 for host in hosts.into_iter() {
316 if config.should_skip(&host.name) {
317 continue;
318 }
288 let fleet_config = &config.config_field;319 let config = config.clone();
289 let drv = nix_go!(320 let span = info_span!("deploy", host = field::display(&host.name));
290 fleet_config.hosts[{ &host.name }].nixosSystem.config.system.build[{ action.build_attr() }]
291 );
292 let outputs = drv.build().await.map_err(|e| {321 let hostname = host.name.clone();
293 if action.build_attr() == "sdImage" {322 // FIXME: Fix repl concurrency (see build-systems)
294 info!("sd-image build failed");323 set.spawn_local(
295 info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");324 (async move {
296 }
297 e
298 })?;
299 let out_output = outputs
300 .get("out")
301 .ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;
302
303 match action {325 let built = match build_task(config.clone(), hostname.clone(), "toplevel").await
304 Action::Upload { action } => {326 {
327 Ok(path) => path,
328 Err(e) => {
329 error!("failed to deploy host: {}", e);
330 return;
331 }
332 };
305 if !config.is_local(&host.name) {333 if !config.is_local(&hostname) {
306 info!("uploading system closure");334 info!("uploading system closure");
307 {335 {
308 // TODO: Move to remote_derivation method.336 // TODO: Move to remote_derivation method.
316 .arg("sign")344 .arg("sign")
317 .comparg("--key-file", "/etc/nix/private-key")345 .comparg("--key-file", "/etc/nix/private-key")
318 .arg("-r")346 .arg("-r")
319 .arg(out_output);347 .arg(&built);
320 if let Err(e) = sign.sudo().run_nix().await {348 if let Err(e) = sign.sudo().run_nix().await {
321 warn!("Failed to sign store paths: {e}");349 warn!("Failed to sign store paths: {e}");
322 };350 };
323 }351 }
324 let mut tries = 0;352 let mut tries = 0;
325 loop {353 loop {
326 match host.remote_derivation(out_output).await {354 match host.remote_derivation(&built).await {
327 Ok(remote) => {355 Ok(remote) => {
328 assert!(&remote == out_output, "CA derivations aren't implemented");356 assert!(remote == built, "CA derivations aren't implemented");
329 break;357 break;
330 }358 }
331 Err(e) if tries < 3 => {359 Err(e) if tries < 3 => {
332 tries += 1;360 tries += 1;
333 warn!("Copy failure ({}/3): {}", tries, e);361 warn!("copy failure ({}/3): {}", tries, e);
334 sleep(Duration::from_millis(5000)).await;362 sleep(Duration::from_millis(5000)).await;
335 }363 }
336 Err(e) => return Err(e),364 Err(e) => {
365 error!("upload failed: {e}");
366 return;
367 }
337 }368 }
338 }369 }
339 }370 }
340 if let Some(action) = action {371 if let Err(e) =
341 execute_upload(&self, action, &host, out_output.clone()).await?372 deploy_task(self.action, &host, built, self.disable_rollback).await
342 }373 {
343 }374 error!("activation failed: {e}");
344 Action::Package(PackageAction::SdImage) => {375 }
345 let mut out = current_dir()?;376 })
346 out.push(format!("sd-image-{}", host.name));
347
348 info!("linking sd image to {:?}", out);
349 symlink(out_output, out)?;
350 }
351 Action::Package(PackageAction::InstallationCd) => {
352 let mut out = current_dir()?;
353 out.push(format!("installation-cd-{}", host.name));377 .instrument(span),
354
355 info!("linking iso image to {:?}", out);378 );
356 symlink(out_output, out)?;379 }
357 }
358 };380 set.await;
359 Ok(())381 Ok(())
360 }382 }
361
362 pub async fn run(self, config: &Config) -> Result<()> {
363 let hosts = config.list_hosts().await?;
364 let set = LocalSet::new();
365 let this = &self;
366 for host in hosts.into_iter() {
367 if config.should_skip(&host.name) {
368 continue;
369 }
370 let config = config.clone();
371 let this = this.clone();
372 let span = info_span!("deployment", host = field::display(&host.name));
373 let hostname = host.name;
374 // FIXME: Since the introduction of better-nix-eval,
375 // due to single repl used for builds, hosts are waiting for each other to build,
376 // instead of building concurrently.
377 //
378 // Open multiple repls?
379 //
380 // Create build batcher, which will behave similar to golangs
381 // WaitGroup, and start executing once all the build tasks are scheduled?
382 // This also allows to cleanup build output, as there will be no longer
383 // "waiting for remote machine" messages in the cases when one package is needed for
384 // multiple hosts.
385 set.spawn_local(
386 (async move {
387 match this.build_task(config, hostname).await {
388 Ok(_) => {}
389 Err(e) => {
390 error!("failed to deploy host: {}", e)
391 }
392 }
393 })
394 .instrument(span),
395 );
396 }
397 set.await;
398 Ok(())
399 }
400}383}
401384
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,8 +7,6 @@
 use anyhow::{anyhow, bail, ensure, Context, Result};
 use chrono::{DateTime, Utc};
 use clap::Parser;
-use futures::StreamExt;
-use itertools::Itertools;
 use owo_colors::OwoColorize;
 use serde::Deserialize;
 use std::{
@@ -570,7 +568,7 @@
 					config.replace_shared(
 						name.to_owned(),
 						update_owner_set(
-							&name,
+							name,
 							config,
 							data,
 							secret,
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -14,7 +14,6 @@
 use openssh::SessionBuilder;
 use serde::de::DeserializeOwned;
 use tempfile::NamedTempFile;
-use tracing::instrument;
 
 use crate::{
 	better_nix_eval::{Field, NixSessionPool},
@@ -90,6 +89,7 @@
 		cmd.arg(path);
 		cmd.run_string().await
 	}
+	#[allow(dead_code)]
 	pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {
 		let text = self.read_file_text(path).await?;
 		Ok(serde_json::from_str(&text)?)
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -12,14 +12,17 @@
 mod fleetdata;
 
 use std::ffi::OsString;
-use std::io::{stderr, stdout, Write};
 use std::process::exit;
 use std::time::Duration;
 
 use anyhow::{bail, Result};
 use clap::Parser;
 
-use cmds::{build_systems::BuildSystems, info::Info, secrets::Secret};
+use cmds::{
+	build_systems::{BuildSystems, Deploy},
+	info::Info,
+	secrets::Secret,
+};
 use futures::future::LocalBoxFuture;
 use futures::stream::FuturesUnordered;
 use futures::TryStreamExt;
@@ -73,6 +76,8 @@
 enum Opts {
 	/// Prepare systems for deployments
 	BuildSystems(BuildSystems),
+
+	Deploy(Deploy),
 	/// Secret management
 	#[clap(subcommand)]
 	Secret(Secret),
@@ -94,6 +99,7 @@
 async fn run_command(config: &Config, command: Opts) -> Result<()> {
 	match command {
 		Opts::BuildSystems(c) => c.run(config).await?,
+		Opts::Deploy(d) => d.run(config).await?,
 		Opts::Secret(s) => s.run(config).await?,
 		Opts::Info(i) => i.run(config).await?,
 		Opts::Prefetch(p) => p.run(config).await?,
modifiedcrates/better-command/src/handler.rsdiffbeforeafterboth
--- a/crates/better-command/src/handler.rs
+++ b/crates/better-command/src/handler.rs
@@ -165,7 +165,7 @@
 								drv = pkg;
 							}
 						}
-						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
+						info!(target: "nix","copying {} {} -> {}", drv, from, to);
 						let span = info_span!("copy", from, to, drv);
 						span.pb_start();
 						self.spans.insert(id, span);
modifiedflake.lockdiffbeforeafterboth
--- a/flake.lock
+++ b/flake.lock
@@ -38,11 +38,11 @@
     },
     "nixpkgs": {
       "locked": {
-        "lastModified": 1703974965,
-        "narHash": "sha256-dvZjLuAcLnv25bqStTL2ZICC5YSs8aynF5amRM+I6UM=",
+        "lastModified": 1704409229,
+        "narHash": "sha256-Vc41cRJ3trOnocovLe0zZE35pK5Lfuo/zHk0xx3CNDY=",
         "owner": "nixos",
         "repo": "nixpkgs",
-        "rev": "9f434bd436e2bb5615827469ed651e30c26daada",
+        "rev": "786f788914f2a6e94cedf361541894e972b8fd23",
         "type": "github"
       },
       "original": {
@@ -67,11 +67,11 @@
         ]
       },
       "locked": {
-        "lastModified": 1703902408,
-        "narHash": "sha256-qXdWvu+tlgNjeoz8yQMRKSom6QyRROfgpmeOhwbujqw=",
+        "lastModified": 1704075545,
+        "narHash": "sha256-L3zgOuVKhPjKsVLc3yTm2YJ6+BATyZBury7wnhyc8QU=",
         "owner": "oxalica",
         "repo": "rust-overlay",
-        "rev": "319f57cd2c34348c55970a4bf2b35afe82088681",
+        "rev": "a0df72e106322b67e9c6e591fe870380bd0da0d5",
         "type": "github"
       },
       "original": {
modifiedflake.nixdiffbeforeafterboth
--- a/flake.nix
+++ b/flake.nix
@@ -29,7 +29,7 @@
         llvmPkgs = pkgs.buildPackages.llvmPackages_11;
         rust =
           (pkgs.rustChannelOf {
-            date = "2023-12-29";
+            date = "2024-01-01";
             channel = "nightly";
           })
           .default