difftreelog
refactor split build-systems and deploy commands
in: trunk
10 files changed
Cargo.tomldiffbeforeafterboth5[workspace.dependencies]5[workspace.dependencies]6nixlike = { path = "./crates/nixlike" }6nixlike = { path = "./crates/nixlike" }7better-command = { path = "./crates/better-command" }7better-command = { path = "./crates/better-command" }8uuid = { version = "1.3.3", features = ["v4"] }9tokio = { version = "1.33.0", features = ["fs", "rt", "macros", "sync", "time", "rt-multi-thread"] }810cmds/fleet/Cargo.tomldiffbeforeafterboth8[dependencies]8[dependencies]9nixlike.workspace = true9nixlike.workspace = true10better-command.workspace = true10better-command.workspace = true11tokio.workspace = true11anyhow = "1.0"12anyhow = "1.0"12serde = { version = "1.0", features = ["derive"] }13serde = { version = "1.0", features = ["derive"] }13serde_json = "1.0"14serde_json = "1.0"27 "wrap_help",28 "wrap_help",28 "unicode",29 "unicode",29] }30] }30tokio = { version = "1.33.0", features = ["full"] }31tracing = "0.1"31tracing = "0.1"32tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }32tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }33tokio-util = { version = "0.7.10", features = ["codec"] }33tokio-util = { version = "0.7.10", features = ["codec"] }cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth428 self.used_fields.extend(e.used_fields);428 self.used_fields.extend(e.used_fields);429 }429 }430430431 #[allow(dead_code)]431 pub fn session(&self) -> NixSession {432 pub fn session(&self) -> NixSession {432 let mut session = None;433 let mut session = None;433 for ele in &self.used_fields {434 for ele in &self.used_fields {444 }445 }445 session.expect("expr without fields used")446 session.expect("expr without fields used")446 }447 }448 #[allow(dead_code)]447 pub fn index_attr(&mut self, s: &str) {449 pub fn index_attr(&mut self, s: &str) {448 let escaped = nixlike::serialize(s).expect("string");450 let escaped = nixlike::serialize(s).expect("string");449 self.out.push('.');451 self.out.push('.');559pub enum Index {561pub enum Index {560 Var(String),562 Var(String),561 String(String),563 String(String),564 #[allow(dead_code)]562 Apply(String),565 Apply(String),566 #[allow(dead_code)]563 Expr(NixExprBuilder),567 Expr(NixExprBuilder),564 ExprApply(NixExprBuilder),568 ExprApply(NixExprBuilder),565 Pipe(NixExprBuilder),569 Pipe(NixExprBuilder),576 pub fn attr(v: impl AsRef<str>) -> Self {580 pub fn attr(v: impl AsRef<str>) -> Self {577 Self::String(v.as_ref().to_owned())581 Self::String(v.as_ref().to_owned())578 }582 }583 #[allow(dead_code)]579 pub fn apply(v: impl Serialize) -> Self {584 pub fn apply(v: impl Serialize) -> Self {580 let serialized = nixlike::serialize(v).expect("invalid value for apply");585 let serialized = nixlike::serialize(v).expect("invalid value for apply");581 Self::Apply(serialized.trim_end().to_owned())586 Self::Apply(serialized.trim_end().to_owned())749 .await754 .await750 .with_context(|| context("as_json", self.0.full_path.as_deref(), &query))755 .with_context(|| context("as_json", self.0.full_path.as_deref(), &query))751 }756 }757 #[allow(dead_code)]752 pub async fn has_field(&self, name: &str) -> Result<bool> {758 pub async fn has_field(&self, name: &str) -> Result<bool> {753 let id = self.0.value.expect("can't list root fields");759 let id = self.0.value.expect("can't list root fields");754 let key = nixlike::escape_string(name);760 let key = nixlike::escape_string(name);786 .await792 .await787 .with_context(|| context("type_of", self.0.full_path.as_deref(), &query))793 .with_context(|| context("type_of", self.0.full_path.as_deref(), &query))788 }794 }795 #[allow(dead_code)]789 pub async fn import(&self) -> Result<Self> {796 pub async fn import(&self) -> Result<Self> {790 let import = Self::new(self.0.session.clone(), "import").await?;797 let import = Self::new(self.0.session.clone(), "import").await?;791 Ok(nix_go!(self | import))798 Ok(nix_go!(self | import))cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth6use crate::host::{Config, ConfigHost};6use crate::host::{Config, ConfigHost};7use crate::nix_go;7use crate::nix_go;8use anyhow::{anyhow, Result};8use anyhow::{anyhow, Result};9use clap::Parser;9use clap::{Parser, ValueEnum};10use itertools::Itertools as _;10use itertools::Itertools as _;11use tokio::{task::LocalSet, time::sleep};11use tokio::{task::LocalSet, time::sleep};12use tracing::{error, field, info, info_span, warn, Instrument};12use tracing::{error, field, info, info_span, warn, Instrument};131314#[derive(Parser, Clone)]14#[derive(Parser)]15pub struct BuildSystems {15pub struct Deploy {16 /// Disable automatic rollback16 /// Disable automatic rollback17 #[clap(long)]17 #[clap(long)]18 disable_rollback: bool,18 disable_rollback: bool,19 #[clap(subcommand)]20 subcommand: Subcommand,19 action: DeployAction,21}20}222122#[derive(ValueEnum, Clone, Copy)]23enum UploadAction {23enum DeployAction {24 /// Upload derivation, but do not execute the update.25 Upload,26 /// Upload and execute the activation script, old version will be used after reboot.24 Test,27 Test,28 /// Upload and set as current system profile, but do not execute activation script.25 Boot,29 Boot,30 /// Upload, set current profile, and execute activation script.26 Switch,31 Switch,27}32}3328impl UploadAction {34impl DeployAction {29 fn name(&self) -> &'static str {35 pub(crate) fn name(&self) -> Option<&'static str> {30 match self {36 match self {37 DeployAction::Upload => None,31 UploadAction::Test => "test",38 DeployAction::Test => Some("test"),32 UploadAction::Boot => "boot",39 DeployAction::Boot => Some("boot"),33 UploadAction::Switch => "switch",40 DeployAction::Switch => Some("switch"),34 }41 }35 }42 }3645 }51 }46}52}4748enum PackageAction {49 SdImage,50 InstallationCd,51}52impl PackageAction {53 fn build_attr(&self) -> String {54 match self {55 PackageAction::SdImage => "sdImage".to_owned(),56 PackageAction::InstallationCd => "isoImage".to_owned(),57 }58 }59}6061enum Action {62 Upload { action: Option<UploadAction> },63 Package(PackageAction),64}65impl Action {66 fn build_attr(&self) -> String {67 match self {68 Action::Upload { .. } => "toplevel".to_owned(),69 Action::Package(p) => p.build_attr(),70 }71 }72}7374impl From<Subcommand> for Action {75 fn from(s: Subcommand) -> Self {76 match s {77 Subcommand::Upload => Self::Upload { action: None },78 Subcommand::Test => Self::Upload {79 action: Some(UploadAction::Test),80 },81 Subcommand::Boot => Self::Upload {82 action: Some(UploadAction::Boot),83 },84 Subcommand::Switch => Self::Upload {85 action: Some(UploadAction::Switch),86 },87 Subcommand::SdImage => Self::Package(PackageAction::SdImage),88 Subcommand::InstallationCd => Self::Package(PackageAction::InstallationCd),89 }90 }91}925393#[derive(Parser, Clone)]54#[derive(Parser, Clone)]94enum Subcommand {55pub struct BuildSystems {95 /// Upload, but do not switch56 /// Attribute to build. Systems are deployed from "toplevel" attr, well-known used attributes96 Upload,57 /// are "sdImage"/"isoImage", and your configuration may include any other build attributes.97 /// Upload + switch to built system until reboot58 #[clap(long, default_value = "toplevel")]98 Test,59 build_attr: String,99 /// Upload + switch to built system after reboot100 Boot,101 /// Upload + test + boot102 Switch,103104 /// Build SD .img image105 SdImage,106 /// Build an installation cd ISO image107 InstallationCd,108}60}10961110struct Generation {62struct Generation {163 Ok(current)115 Ok(current)164}116}165117166async fn execute_upload(118async fn deploy_task(167 build: &BuildSystems,168 action: UploadAction,119 action: DeployAction,169 host: &ConfigHost,120 host: &ConfigHost,170 built: PathBuf,121 built: PathBuf,122 disable_rollback: bool,171) -> Result<()> {123) -> Result<()> {172 let mut failed = false;124 let mut failed = false;173 // TODO: Lockfile, to prevent concurrent system switch?125 // TODO: Lockfile, to prevent concurrent system switch?174 // TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback126 // TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback175 // is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to127 // is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to176 // unit name conflict in systemd-run128 // unit name conflict in systemd-run177 // This code is tied to rollback.nix129 // This code is tied to rollback.nix178 if !build.disable_rollback {130 if !disable_rollback {179 let _span = info_span!("preparing").entered();131 let _span = info_span!("preparing").entered();180 info!("preparing for rollback");132 info!("preparing for rollback");181 let generation = get_current_generation(host).await?;133 let generation = get_current_generation(host).await?;235 switch_script.push("bin");187 switch_script.push("bin");236 switch_script.push("switch-to-configuration");188 switch_script.push("switch-to-configuration");237 let mut cmd = host.cmd(switch_script).in_current_span().await?;189 let mut cmd = host.cmd(switch_script).in_current_span().await?;238 cmd.arg(action.name());190 cmd.arg(action.name().expect("upload.should_activate == false"));239 if let Err(e) = cmd.sudo().run().in_current_span().await {191 if let Err(e) = cmd.sudo().run().in_current_span().await {240 error!("failed to activate: {e}");192 error!("failed to activate: {e}");241 failed = true;193 failed = true;242 }194 }243 }195 }244 if !build.disable_rollback {196 if !disable_rollback {245 if failed {197 if failed {246 info!("executing rollback");198 info!("executing rollback");247 if let Err(e) = host199 if let Err(e) = host280 Ok(())232 Ok(())281}233}234235async fn build_task(config: Config, host: String, build_attr: &str) -> Result<PathBuf> {236 info!("building");237 let host = config.host(&host).await?;238 // let action = Action::from(self.subcommand.clone());239 let fleet_config = &config.config_field;240 let drv = nix_go!(241 fleet_config.hosts[{ &host.name }]242 .nixosSystem243 .config244 .system245 .build[{ build_attr }]246 );247 let outputs = drv.build().await.map_err(|e| {248 if build_attr == "sdImage" {249 info!("sd-image build failed");250 info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");251 }252 e253 })?;254 let out_output = outputs255 .get("out")256 .ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;257258 Ok(out_output.clone())259}260261impl BuildSystems {262 pub async fn run(self, config: &Config) -> Result<()> {263 let hosts = config.list_hosts().await?;264 let set = LocalSet::new();265 let build_attr = self.build_attr.clone();266 for host in hosts.into_iter() {267 if config.should_skip(&host.name) {268 continue;269 }270 let config = config.clone();271 let span = info_span!("build", host = field::display(&host.name));272 let hostname = host.name;273 let build_attr = build_attr.clone();274 // FIXME: Since the introduction of better-nix-eval,275 // due to single repl used for builds, hosts are waiting for each other to build,276 // instead of building concurrently.277 //278 // Open multiple repls?279 //280 // Create build batcher, which will behave similar to golangs281 // WaitGroup, and start executing once all the build tasks are scheduled?282 // This also allows to cleanup build output, as there will be no longer283 // "waiting for remote machine" messages in the cases when one package is needed for284 // multiple hosts.285 set.spawn_local(286 (async move {287 let built = match build_task(config, hostname.clone(), &build_attr).await {288 Ok(path) => path,289 Err(e) => {290 error!("failed to deploy host: {}", e);291 return;292 }293 };294 // TODO: Handle error295 let mut out = current_dir().expect("cwd exists");296 out.push(format!("built-{}", hostname));297298 info!("linking iso image to {:?}", out);299 if let Err(e) = symlink(built, out) {300 error!("failed to symlink: {e}")301 }302 })303 .instrument(span),304 );305 }306 set.await;307 Ok(())308 }309}282310283impl BuildSystems {311impl Deploy {284 async fn build_task(self, config: Config, host: String) -> Result<()> {312 pub async fn run(self, config: &Config) -> Result<()> {285 info!("building");286 let host = config.host(&host).await?;313 let hosts = config.list_hosts().await?;287 let action = Action::from(self.subcommand.clone());314 let set = LocalSet::new();315 for host in hosts.into_iter() {316 if config.should_skip(&host.name) {317 continue;318 }288 let fleet_config = &config.config_field;319 let config = config.clone();289 let drv = nix_go!(320 let span = info_span!("deploy", host = field::display(&host.name));290 fleet_config.hosts[{ &host.name }].nixosSystem.config.system.build[{ action.build_attr() }]291 );292 let outputs = drv.build().await.map_err(|e| {321 let hostname = host.name.clone();293 if action.build_attr() == "sdImage" {322 // FIXME: Fix repl concurrency (see build-systems)294 info!("sd-image build failed");323 set.spawn_local(295 info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");324 (async move {296 }297 e298 })?;299 let out_output = outputs300 .get("out")301 .ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;302303 match action {325 let built = match build_task(config.clone(), hostname.clone(), "toplevel").await304 Action::Upload { action } => {326 {327 Ok(path) => path,328 Err(e) => {329 error!("failed to deploy host: {}", e);330 return;331 }332 };305 if !config.is_local(&host.name) {333 if !config.is_local(&hostname) {306 info!("uploading system closure");334 info!("uploading system closure");307 {335 {308 // TODO: Move to remote_derivation method.336 // TODO: Move to remote_derivation method.316 .arg("sign")344 .arg("sign")317 .comparg("--key-file", "/etc/nix/private-key")345 .comparg("--key-file", "/etc/nix/private-key")318 .arg("-r")346 .arg("-r")319 .arg(out_output);347 .arg(&built);320 if let Err(e) = sign.sudo().run_nix().await {348 if let Err(e) = sign.sudo().run_nix().await {321 warn!("Failed to sign store paths: {e}");349 warn!("Failed to sign store paths: {e}");322 };350 };323 }351 }324 let mut tries = 0;352 let mut tries = 0;325 loop {353 loop {326 match host.remote_derivation(out_output).await {354 match host.remote_derivation(&built).await {327 Ok(remote) => {355 Ok(remote) => {328 assert!(&remote == out_output, "CA derivations aren't implemented");356 assert!(remote == built, "CA derivations aren't implemented");329 break;357 break;330 }358 }331 Err(e) if tries < 3 => {359 Err(e) if tries < 3 => {332 tries += 1;360 tries += 1;333 warn!("Copy failure ({}/3): {}", tries, e);361 warn!("copy failure ({}/3): {}", tries, e);334 sleep(Duration::from_millis(5000)).await;362 sleep(Duration::from_millis(5000)).await;335 }363 }336 Err(e) => return Err(e),364 Err(e) => {365 error!("upload failed: {e}");366 return;367 }337 }368 }338 }369 }339 }370 }340 if let Some(action) = action {371 if let Err(e) =341 execute_upload(&self, action, &host, out_output.clone()).await?372 deploy_task(self.action, &host, built, self.disable_rollback).await342 }373 {343 }374 error!("activation failed: {e}");344 Action::Package(PackageAction::SdImage) => {375 }345 let mut out = current_dir()?;376 })346 out.push(format!("sd-image-{}", host.name));347348 info!("linking sd image to {:?}", out);349 symlink(out_output, out)?;350 }351 Action::Package(PackageAction::InstallationCd) => {352 let mut out = current_dir()?;353 out.push(format!("installation-cd-{}", host.name));377 .instrument(span),354355 info!("linking iso image to {:?}", out);378 );356 symlink(out_output, out)?;379 }357 }358 };380 set.await;359 Ok(())381 Ok(())360 }382 }361362 pub async fn run(self, config: &Config) -> Result<()> {363 let hosts = config.list_hosts().await?;364 let set = LocalSet::new();365 let this = &self;366 for host in hosts.into_iter() {367 if config.should_skip(&host.name) {368 continue;369 }370 let config = config.clone();371 let this = this.clone();372 let span = info_span!("deployment", host = field::display(&host.name));373 let hostname = host.name;374 // FIXME: Since the introduction of better-nix-eval,375 // due to single repl used for builds, hosts are waiting for each other to build,376 // instead of building concurrently.377 //378 // Open multiple repls?379 //380 // Create build batcher, which will behave similar to golangs381 // WaitGroup, and start executing once all the build tasks are scheduled?382 // This also allows to cleanup build output, as there will be no longer383 // "waiting for remote machine" messages in the cases when one package is needed for384 // multiple hosts.385 set.spawn_local(386 (async move {387 match this.build_task(config, hostname).await {388 Ok(_) => {}389 Err(e) => {390 error!("failed to deploy host: {}", e)391 }392 }393 })394 .instrument(span),395 );396 }397 set.await;398 Ok(())399 }400}383}401384cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth7use anyhow::{anyhow, bail, ensure, Context, Result};7use anyhow::{anyhow, bail, ensure, Context, Result};8use chrono::{DateTime, Utc};8use chrono::{DateTime, Utc};9use clap::Parser;9use clap::Parser;10use futures::StreamExt;11use itertools::Itertools;12use owo_colors::OwoColorize;10use owo_colors::OwoColorize;13use serde::Deserialize;11use serde::Deserialize;14use std::{12use std::{570 config.replace_shared(568 config.replace_shared(571 name.to_owned(),569 name.to_owned(),572 update_owner_set(570 update_owner_set(573 &name,571 name,574 config,572 config,575 data,573 data,576 secret,574 secret,cmds/fleet/src/host.rsdiffbeforeafterboth14use openssh::SessionBuilder;14use openssh::SessionBuilder;15use serde::de::DeserializeOwned;15use serde::de::DeserializeOwned;16use tempfile::NamedTempFile;16use tempfile::NamedTempFile;17use tracing::instrument;181719use crate::{18use crate::{20 better_nix_eval::{Field, NixSessionPool},19 better_nix_eval::{Field, NixSessionPool},90 cmd.arg(path);89 cmd.arg(path);91 cmd.run_string().await90 cmd.run_string().await92 }91 }92 #[allow(dead_code)]93 pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {93 pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {94 let text = self.read_file_text(path).await?;94 let text = self.read_file_text(path).await?;95 Ok(serde_json::from_str(&text)?)95 Ok(serde_json::from_str(&text)?)cmds/fleet/src/main.rsdiffbeforeafterboth12mod fleetdata;12mod fleetdata;131314use std::ffi::OsString;14use std::ffi::OsString;15use std::io::{stderr, stdout, Write};16use std::process::exit;15use std::process::exit;17use std::time::Duration;16use std::time::Duration;181719use anyhow::{bail, Result};18use anyhow::{bail, Result};20use clap::Parser;19use clap::Parser;212022use cmds::{build_systems::BuildSystems, info::Info, secrets::Secret};21use cmds::{22 build_systems::{BuildSystems, Deploy},23 info::Info,24 secrets::Secret,25};23use futures::future::LocalBoxFuture;26use futures::future::LocalBoxFuture;24use futures::stream::FuturesUnordered;27use futures::stream::FuturesUnordered;74 /// Prepare systems for deployments77 /// Prepare systems for deployments75 BuildSystems(BuildSystems),78 BuildSystems(BuildSystems),7980 Deploy(Deploy),76 /// Secret management81 /// Secret management77 #[clap(subcommand)]82 #[clap(subcommand)]78 Secret(Secret),83 Secret(Secret),94async fn run_command(config: &Config, command: Opts) -> Result<()> {99async fn run_command(config: &Config, command: Opts) -> Result<()> {95 match command {100 match command {96 Opts::BuildSystems(c) => c.run(config).await?,101 Opts::BuildSystems(c) => c.run(config).await?,102 Opts::Deploy(d) => d.run(config).await?,97 Opts::Secret(s) => s.run(config).await?,103 Opts::Secret(s) => s.run(config).await?,98 Opts::Info(i) => i.run(config).await?,104 Opts::Info(i) => i.run(config).await?,99 Opts::Prefetch(p) => p.run(config).await?,105 Opts::Prefetch(p) => p.run(config).await?,crates/better-command/src/handler.rsdiffbeforeafterboth165 drv = pkg;165 drv = pkg;166 }166 }167 }167 }168 // info!(target: "nix","copying {} {} -> {}", drv, from, to);168 info!(target: "nix","copying {} {} -> {}", drv, from, to);169 let span = info_span!("copy", from, to, drv);169 let span = info_span!("copy", from, to, drv);170 span.pb_start();170 span.pb_start();171 self.spans.insert(id, span);171 self.spans.insert(id, span);flake.lockdiffbeforeafterboth38 },38 },39 "nixpkgs": {39 "nixpkgs": {40 "locked": {40 "locked": {41 "lastModified": 1703974965,41 "lastModified": 1704409229,42 "narHash": "sha256-dvZjLuAcLnv25bqStTL2ZICC5YSs8aynF5amRM+I6UM=",42 "narHash": "sha256-Vc41cRJ3trOnocovLe0zZE35pK5Lfuo/zHk0xx3CNDY=",43 "owner": "nixos",43 "owner": "nixos",44 "repo": "nixpkgs",44 "repo": "nixpkgs",45 "rev": "9f434bd436e2bb5615827469ed651e30c26daada",45 "rev": "786f788914f2a6e94cedf361541894e972b8fd23",46 "type": "github"46 "type": "github"47 },47 },48 "original": {48 "original": {67 ]67 ]68 },68 },69 "locked": {69 "locked": {70 "lastModified": 1703902408,70 "lastModified": 1704075545,71 "narHash": "sha256-qXdWvu+tlgNjeoz8yQMRKSom6QyRROfgpmeOhwbujqw=",71 "narHash": "sha256-L3zgOuVKhPjKsVLc3yTm2YJ6+BATyZBury7wnhyc8QU=",72 "owner": "oxalica",72 "owner": "oxalica",73 "repo": "rust-overlay",73 "repo": "rust-overlay",74 "rev": "319f57cd2c34348c55970a4bf2b35afe82088681",74 "rev": "a0df72e106322b67e9c6e591fe870380bd0da0d5",75 "type": "github"75 "type": "github"76 },76 },77 "original": {77 "original": {flake.nixdiffbeforeafterboth29 llvmPkgs = pkgs.buildPackages.llvmPackages_11;29 llvmPkgs = pkgs.buildPackages.llvmPackages_11;30 rust =30 rust =31 (pkgs.rustChannelOf {31 (pkgs.rustChannelOf {32 date = "2023-12-29";32 date = "2024-01-01";33 channel = "nightly";33 channel = "nightly";34 })34 })35 .default35 .default