difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth318source = "registry+https://github.com/rust-lang/crates.io-index"318source = "registry+https://github.com/rust-lang/crates.io-index"319checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"319checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"320321[[package]]322name = "better-command"323version = "0.1.0"324dependencies = [325 "once_cell",326 "regex",327 "serde",328 "serde_json",329 "tracing",330 "tracing-indicatif",331]320332321[[package]]333[[package]]322name = "bitflags"334name = "bitflags"748 "anyhow",760 "anyhow",749 "async-trait",761 "async-trait",750 "base64 0.21.5",762 "base64 0.21.5",763 "better-command",751 "chrono",764 "chrono",752 "clap",765 "clap",753 "futures",766 "futures",151015231511[[package]]1524[[package]]1512name = "once_cell"1525name = "once_cell"1513version = "1.18.0"1526version = "1.19.0"1514source = "registry+https://github.com/rust-lang/crates.io-index"1527source = "registry+https://github.com/rust-lang/crates.io-index"1515checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"1528checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"151615291517[[package]]1530[[package]]1518name = "opaque-debug"1531name = "opaque-debug"1921source = "registry+https://github.com/rust-lang/crates.io-index"1934source = "registry+https://github.com/rust-lang/crates.io-index"1922checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"1935checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"19361937[[package]]1938name = "remowt-agent"1939version = "0.1.0"192319401924[[package]]1941[[package]]1925name = "rnix"1942name = "rnix"210521222106[[package]]2123[[package]]2107name = "serde"2124name = "serde"2108version = "1.0.190"2125version = "1.0.193"2109source = "registry+https://github.com/rust-lang/crates.io-index"2126source = "registry+https://github.com/rust-lang/crates.io-index"2110checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"2127checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"2111dependencies = [2128dependencies = [2112 "serde_derive",2129 "serde_derive",2113]2130]212321402124[[package]]2141[[package]]2125name = "serde_derive"2142name = "serde_derive"2126version = "1.0.190"2143version = "1.0.193"2127source = "registry+https://github.com/rust-lang/crates.io-index"2144source = "registry+https://github.com/rust-lang/crates.io-index"2128checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"2145checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"2129dependencies = [2146dependencies = [2130 "proc-macro2",2147 "proc-macro2",2131 "quote",2148 "quote",213421512135[[package]]2152[[package]]2136name = "serde_json"2153name = "serde_json"2137version = "1.0.107"2154version = "1.0.108"2138source = "registry+https://github.com/rust-lang/crates.io-index"2155source = "registry+https://github.com/rust-lang/crates.io-index"2139checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"2156checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"2140dependencies = [2157dependencies = [2141 "itoa",2158 "itoa",2142 "ryu",2159 "ryu",252725442528[[package]]2545[[package]]2529name = "tracing"2546name = "tracing"2530version = "0.1.37"2547version = "0.1.40"2531source = "registry+https://github.com/rust-lang/crates.io-index"2548source = "registry+https://github.com/rust-lang/crates.io-index"2532checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"2549checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"2533dependencies = [2550dependencies = [2534 "cfg-if",2535 "pin-project-lite",2551 "pin-project-lite",2536 "tracing-attributes",2552 "tracing-attributes",2537 "tracing-core",2553 "tracing-core",2538]2554]253925552540[[package]]2556[[package]]2541name = "tracing-attributes"2557name = "tracing-attributes"2542version = "0.1.26"2558version = "0.1.27"2543source = "registry+https://github.com/rust-lang/crates.io-index"2559source = "registry+https://github.com/rust-lang/crates.io-index"2544checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"2560checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"2545dependencies = [2561dependencies = [2546 "proc-macro2",2562 "proc-macro2",2547 "quote",2563 "quote",255025662551[[package]]2567[[package]]2552name = "tracing-core"2568name = "tracing-core"2553version = "0.1.31"2569version = "0.1.32"2554source = "registry+https://github.com/rust-lang/crates.io-index"2570source = "registry+https://github.com/rust-lang/crates.io-index"2555checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"2571checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"2556dependencies = [2572dependencies = [2557 "once_cell",2573 "once_cell",2558 "valuable",2574 "valuable",2559]2575]256025762561[[package]]2577[[package]]2562name = "tracing-indicatif"2578name = "tracing-indicatif"2563version = "0.3.5"2579version = "0.3.6"2564source = "registry+https://github.com/rust-lang/crates.io-index"2580source = "registry+https://github.com/rust-lang/crates.io-index"2565checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"2581checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"2566dependencies = [2582dependencies = [2567 "indicatif",2583 "indicatif",2568 "tracing",2584 "tracing",Cargo.tomldiffbeforeafterboth2members = ["crates/*", "cmds/*"]2members = ["crates/*", "cmds/*"]3resolver = "2"3resolver = "2"45[workspace.dependencies]6nixlike = { path = "./crates/nixlike" }7better-command = { path = "./crates/better-command" }48cmds/fleet/Cargo.tomldiffbeforeafterboth6edition = "2021"6edition = "2021"778[dependencies]8[dependencies]9nixlike.workspace = true10better-command.workspace = true9anyhow = "1.0"11anyhow = "1.0"10serde = { version = "1.0", features = ["derive"] }12serde = { version = "1.0", features = ["derive"] }11serde_json = "1.0"13serde_json = "1.0"15hostname = "0.3.1"17hostname = "0.3.1"16age-core = "0.9.0"18age-core = "0.9.0"17peg = "0.8.2"19peg = "0.8.2"18nixlike = { path = "../../crates/nixlike" }19age = { version = "0.9.2", features = ["ssh", "armor"] }20age = { version = "0.9.2", features = ["ssh", "armor"] }20base64 = "0.21.5"21base64 = "0.21.5"21chrono = { version = "0.4.31", features = ["serde"] }22chrono = { version = "0.4.31", features = ["serde"] }cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth1//! Wrapper around nix repl, which allows to work on nix code, without relying on2//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.31use std::collections::HashMap;4use std::collections::HashMap;2use std::ffi::{OsStr, OsString};5use std::ffi::{OsStr, OsString};6use std::sync::{Arc, OnceLock};9use std::sync::{Arc, OnceLock};7108use anyhow::{anyhow, bail, ensure, Context, Result};11use anyhow::{anyhow, bail, ensure, Context, Result};12use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};9use futures::StreamExt;13use futures::StreamExt;10use itertools::Itertools;14use itertools::Itertools;11use r2d2::{Pool, PooledConnection};15use r2d2::{Pool, PooledConnection};14use tokio::io::AsyncWriteExt;18use tokio::io::AsyncWriteExt;15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};16use tokio::select;20use tokio::select;17use tokio::sync::{mpsc, oneshot};21use tokio::sync::{mpsc, oneshot, Mutex};18use tokio_util::codec::{FramedRead, LinesCodec};22use tokio_util::codec::{FramedRead, LinesCodec};19use tracing::{debug, error, warn, Level};23use tracing::{debug, error, warn, Level};2021use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};222423const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";25const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";242630 string_wrapping: (String, String),32 string_wrapping: (String, String),31 number_wrapping: (String, String),33 number_wrapping: (String, String),3435 executing_command: Arc<Mutex<()>>,323633 next_id: u32,37 next_id: u32,34 free_list: Vec<u32>,38 free_list: Vec<u32>,219 string_wrapping: Default::default(),223 string_wrapping: Default::default(),220 number_wrapping: Default::default(),224 number_wrapping: Default::default(),225226 executing_command: Arc::new(Mutex::new(())),221227222 next_id: 0,228 next_id: 0,223 free_list: vec![],229 free_list: vec![],331 expr: impl AsRef<[u8]>,337 expr: impl AsRef<[u8]>,332 err_handler: &mut dyn Handler,338 err_handler: &mut dyn Handler,333 ) -> Result<String> {339 ) -> Result<String> {340 // Prevent two commands from being executed in parallel, messing with each other.341 let _lock = self.executing_command.clone();342 let _guard = _lock.lock().await;343334 self.send_command(expr).await?;344 self.send_command(expr).await?;335 // It will be echoed345 // It will be echoedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth3use std::{env::current_dir, time::Duration};3use std::{env::current_dir, time::Duration};445use crate::command::MyCommand;5use crate::command::MyCommand;6use crate::host::Config;6use crate::host::{Config, ConfigHost};7use crate::nix_go;7use crate::nix_go;8use anyhow::{anyhow, Result};8use anyhow::{anyhow, Result};9use clap::Parser;9use clap::Parser;10use itertools::Itertools;10use itertools::Itertools as _;11use tokio::{task::LocalSet, time::sleep};11use tokio::{task::LocalSet, time::sleep};12use tracing::{error, field, info, info_span, warn, Instrument};12use tracing::{error, field, info, info_span, warn, Instrument};1313112 current: bool,112 current: bool,113 datetime: String,113 datetime: String,114}114}115async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {115async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {116 let mut cmd = MyCommand::new("nix-env");116 let mut cmd = host.cmd("nix-env").await?;117 cmd.comparg("--profile", "/nix/var/nix/profiles/system")117 cmd.comparg("--profile", "/nix/var/nix/profiles/system")118 .arg("--list-generations");118 .arg("--list-generations");119 // Sudo is required due to --list-generations acquiring lock on the profile.119 // Sudo is required due to --list-generations acquiring lock on the profile.120 let data = config.run_string_on(host, cmd, true).await?;120 let data = cmd.sudo().run_string().await?;121 let generations = data121 let generations = data122 .split('\n')122 .split('\n')123 .map(|e| e.trim())123 .map(|e| e.trim())163 Ok(current)163 Ok(current)164}164}165166async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {167 let mut cmd = MyCommand::new("systemctl");168 cmd.arg("stop").arg(unit);169 config.run_on(host, cmd, true).await170}171172async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {173 let mut cmd = MyCommand::new("systemctl");174 cmd.arg("start").arg(unit);175 config.run_on(host, cmd, true).await176}177165178async fn execute_upload(166async fn execute_upload(179 build: &BuildSystems,167 build: &BuildSystems,180 config: &Config,181 action: UploadAction,168 action: UploadAction,182 host: &str,169 host: &ConfigHost,183 built: PathBuf,170 built: PathBuf,184) -> Result<()> {171) -> Result<()> {185 let mut failed = false;172 let mut failed = false;191 if !build.disable_rollback {178 if !build.disable_rollback {192 let _span = info_span!("preparing").entered();179 let _span = info_span!("preparing").entered();193 info!("preparing for rollback");180 info!("preparing for rollback");194 let generation = get_current_generation(config, host).await?;181 let generation = get_current_generation(&host).await?;195 info!(182 info!(196 "rollback target would be {} {}",183 "rollback target would be {} {}",197 generation.id, generation.datetime184 generation.id, generation.datetime198 );185 );199 {186 {200 let mut cmd = MyCommand::new("sh");187 let mut cmd = host.cmd("sh").await?;201 cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));188 cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));202 if let Err(e) = config.run_on(host, cmd, true).await {189 if let Err(e) = cmd.sudo().run().await {203 error!("failed to set rollback marker: {e}");190 error!("failed to set rollback marker: {e}");204 failed = true;191 failed = true;205 }192 }215 // if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.202 // if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.216 // Anyway, reboot will still help in this case.203 // Anyway, reboot will still help in this case.217 if action.should_schedule_rollback_run() {204 if action.should_schedule_rollback_run() {218 let mut cmd = MyCommand::new("systemd-run");205 let mut cmd = host.cmd("systemd-run").await?;219 cmd.comparg("--on-active", "3min")206 cmd.comparg("--on-active", "3min")220 .comparg("--unit", "rollback-watchdog-run")207 .comparg("--unit", "rollback-watchdog-run")221 .arg("systemctl")208 .arg("systemctl")222 .arg("start")209 .arg("start")223 .arg("rollback-watchdog.service");210 .arg("rollback-watchdog.service");224 if let Err(e) = config.run_on(host, cmd, true).await {211 if let Err(e) = cmd.sudo().run().await {225 error!("failed to schedule rollback run: {e}");212 error!("failed to schedule rollback run: {e}");226 failed = true;213 failed = true;227 }214 }228 }215 }229 }216 }217230 if action.should_switch_profile() && !failed {218 if action.should_switch_profile() && !failed {231 info!("switching generation");219 info!("switching generation");232 let mut cmd = MyCommand::new("nix-env");220 let mut cmd = host.cmd("nix-env").await?;233 cmd.comparg("--profile", "/nix/var/nix/profiles/system")221 cmd.comparg("--profile", "/nix/var/nix/profiles/system")234 .comparg("--set", &built);222 .comparg("--set", &built);235 if let Err(e) = config.run_on(host, cmd, true).await {223 if let Err(e) = cmd.sudo().run().await {236 error!("failed to switch generation: {e}");224 error!("failed to switch generation: {e}");237 failed = true;225 failed = true;238 }226 }239 }227 }228229 // FIXME: Connection might be disconnected after activation run230240 if action.should_activate() && !failed {231 if action.should_activate() && !failed {241 let _span = info_span!("activating").entered();232 let _span = info_span!("activating").entered();242 info!("executing activation script");233 info!("executing activation script");243 let mut switch_script = built.clone();234 let mut switch_script = built.clone();244 switch_script.push("bin");235 switch_script.push("bin");245 switch_script.push("switch-to-configuration");236 switch_script.push("switch-to-configuration");246 let mut cmd = MyCommand::new(switch_script);237 let mut cmd = host.cmd(switch_script).await?;247 cmd.arg(action.name());238 cmd.arg(action.name());248 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {239 if let Err(e) = cmd.sudo().run().in_current_span().await {249 error!("failed to activate: {e}");240 error!("failed to activate: {e}");250 failed = true;241 failed = true;251 }242 }252 }243 }253 if !build.disable_rollback {244 if !build.disable_rollback {254 if failed {245 if failed {255 info!("executing rollback");246 info!("executing rollback");256 if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")247 if let Err(e) = host248 .systemctl_start("rollback-watchdog.service")257 .instrument(info_span!("rollback"))249 .instrument(info_span!("rollback"))258 .await250 .await259 {251 {260 error!("failed to trigger rollback: {e}")252 error!("failed to trigger rollback: {e}")261 }253 }262 } else {254 } else {263 info!("trying to mark upgrade as successful");255 info!("trying to mark upgrade as successful");264 let mut cmd = MyCommand::new("rm");265 cmd.arg("-f").arg("/etc/fleet_rollback_marker");266 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {256 if let Err(e) = host257 .rm_file("/etc/fleet_rollback_marker", true)258 .in_current_span()259 .await260 {267 error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")261 error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")268 }262 }269 }263 }270 info!("disarming watchdog, just in case");264 info!("disarming watchdog, just in case");271 if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {265 if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {272 // It is ok, if there was no reboot - then timer might not be running.266 // It is ok, if there was no reboot - then timer might not be running.273 }267 }274 if action.should_schedule_rollback_run() {268 if action.should_schedule_rollback_run() {275 if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {269 if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {276 error!("failed to disarm rollback run: {e}");270 error!("failed to disarm rollback run: {e}");277 }271 }278 }272 }279 } else {273 } else if let Err(_e) = host280 let mut cmd = MyCommand::new("rm");281 cmd.arg("-f").arg("/etc/fleet_rollback_marker");282 if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {274 .rm_file("/etc/fleet_rollback_marker", true)275 .in_current_span()276 .await277 {283 // Marker might not exist, yet better try to remove it.278 // Marker might not exist, yet better try to remove it.284 }279 }285 }286 Ok(())280 Ok(())287}281}288282289impl BuildSystems {283impl BuildSystems {290 async fn build_task(self, config: Config, host: String) -> Result<()> {284 async fn build_task(self, config: Config, host: String) -> Result<()> {291 info!("building");285 info!("building");286 let host = config.host(&host).await?;292 let action = Action::from(self.subcommand.clone());287 let action = Action::from(self.subcommand.clone());293 let fleet_field = &config.fleet_field;288 let fleet_field = &config.fleet_field;294 let drv = nix_go!(289 let drv = nix_go!(295 fleet_field.buildSystems(Obj {290 fleet_field.buildSystems(Obj {296 localSystem: { config.local_system.clone() }291 localSystem: { config.local_system.clone() }297 })[{ action.build_attr() }][{ host }]292 })[{ action.build_attr() }][{ &host.name }]298 );293 );299 let outputs = drv.build().await.map_err(|e| {294 let outputs = drv.build().await.map_err(|e| {300 if action.build_attr() == "sdImage" {295 if action.build_attr() == "sdImage" {309304310 match action {305 match action {311 Action::Upload { action } => {306 Action::Upload { action } => {312 if !config.is_local(&host) {307 if !config.is_local(&host.name) {313 info!("uploading system closure");308 info!("uploading system closure");314 {309 {310 // TODO: Move to remote_derivation method.315 // Alternatively, nix store make-content-addressed can be used,311 // Alternatively, nix store make-content-addressed can be used,316 // at least for the first deployment, to provide trusted store key.312 // at least for the first deployment, to provide trusted store key.317 //313 //329 }325 }330 let mut tries = 0;326 let mut tries = 0;331 loop {327 loop {332 let mut nix = MyCommand::new("nix");333 nix.arg("copy")334 .arg("--substitute-on-destination")335 .comparg("--to", format!("ssh-ng://{host}"))336 .arg(out_output);337 match nix.run_nix().await {328 match host.remote_derivation(out_output).await {338 Ok(()) => break,329 Ok(remote) => {330 assert!(&remote == out_output, "CA derivations aren't implemented");331 break;332 }339 Err(e) if tries < 3 => {333 Err(e) if tries < 3 => {340 tries += 1;334 tries += 1;341 warn!("Copy failure ({}/3): {}", tries, e);335 warn!("Copy failure ({}/3): {}", tries, e);346 }340 }347 }341 }348 if let Some(action) = action {342 if let Some(action) = action {349 execute_upload(&self, &config, action, &host, out_output.clone()).await?343 execute_upload(&self, action, &host, out_output.clone()).await?350 }344 }351 }345 }352 Action::Package(PackageAction::SdImage) => {346 Action::Package(PackageAction::SdImage) => {353 let mut out = current_dir()?;347 let mut out = current_dir()?;354 out.push(format!("sd-image-{}", host));348 out.push(format!("sd-image-{}", host.name));355349356 info!("linking sd image to {:?}", out);350 info!("linking sd image to {:?}", out);357 symlink(out_output, out)?;351 symlink(out_output, out)?;358 }352 }359 Action::Package(PackageAction::InstallationCd) => {353 Action::Package(PackageAction::InstallationCd) => {360 let mut out = current_dir()?;354 let mut out = current_dir()?;361 out.push(format!("installation-cd-{}", host));355 out.push(format!("installation-cd-{}", host.name));362356363 info!("linking iso image to {:?}", out);357 info!("linking iso image to {:?}", out);364 symlink(out_output, out)?;358 symlink(out_output, out)?;379 let this = this.clone();373 let this = this.clone();380 let span = info_span!("deployment", host = field::display(&host.name));374 let span = info_span!("deployment", host = field::display(&host.name));381 let hostname = host.name;375 let hostname = host.name;376 // FIXME: Since the introduction of better-nix-eval,377 // due to single repl used for builds, hosts are waiting for each other to build,378 // instead of building concurrently.379 //380 // Open multiple repls?381 //382 // Create build batcher, which will behave similar to golangs383 // WaitGroup, and start executing once all the build tasks are scheduled?384 // This also allows to cleanup build output, as there will be no longer385 // "waiting for remote machine" messages in the cases when one package is needed for386 // multiple hosts.382 set.spawn_local(387 set.spawn_local(383 (async move {388 (async move {384 match this.build_task(config, hostname).await {389 match this.build_task(config, hostname).await {cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth7use anyhow::{anyhow, bail, ensure, Context, Result};7use anyhow::{anyhow, bail, ensure, Context, Result};8use chrono::{DateTime, Utc};8use chrono::{DateTime, Utc};9use clap::Parser;9use clap::Parser;10use futures::{StreamExt, TryStreamExt};10use futures::StreamExt;11use itertools::Itertools;11use itertools::Itertools;12use owo_colors::OwoColorize;12use owo_colors::OwoColorize;13use std::{13use std::{404 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;404 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;405405406 if let Some(data) = secret.secret.secret {406 if let Some(data) = secret.secret.secret {407 let host = config.host(&identity_holder).await?;407 let encrypted = config408 let encrypted = host.reencrypt(data, target_recipients).await?;408 .reencrypt_on_host(identity_holder, data, target_recipients)409 .await?;410 secret.secret.secret = Some(encrypted);409 secret.secret.secret = Some(encrypted);411 }410 }481 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;480 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;482481483 if let Some(secret) = data.secret.secret {482 if let Some(secret) = data.secret.secret {483 let host = config.host(identity_holder).await?;484 let encrypted = config484 let encrypted = host.reencrypt(secret, target_recipients).await?;485 .reencrypt_on_host(identity_holder, secret, target_recipients)486 .await?;487485488 data.secret.secret = Some(encrypted);486 data.secret.secret = Some(encrypted);cmds/fleet/src/command.rsdiffbeforeafterboth1use std::{1use std::thread::sleep;2 collections::HashMap,2use std::time::Duration;3 ffi::OsStr,3use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};4 pin,5 process::Stdio,6 sync::{Arc, Mutex},7 task::Poll,8};9410use anyhow::{anyhow, Result};5use anyhow::{anyhow, Result};6use better_command::{Handler, NixHandler, PlainHandler};11use futures::StreamExt;7use futures::StreamExt;12use itertools::Either;8use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, OwningCommand, Session};9use openssh::{OverSsh, OwningCommand, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};10use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};12use tracing::{info, debug};20use tracing_indicatif::span_ext::IndicatifSpanExt;211322fn escape_bash(input: &str, out: &mut String) {14fn escape_bash(input: &str, out: &mut String) {23 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";87 return self;79 return self;88 }80 }89 let mut out = Self::new("env");81 let mut out = Self::new("env");90 if let Some(session) = self.ssh_session {82 out.ssh_session = self.ssh_session;91 out = out.ssh_session(session);92 }93 for (k, v) in self.env {83 for (k, v) in self.env {94 assert!(!k.contains('='));84 assert!(!k.contains('='));95 out.arg(format!("{k}={v}"));85 out.arg(format!("{k}={v}"));179 out169 out180 } else {170 } else {181 let mut out = Self::new("sudo");171 let mut out = Self::new("sudo");172 out.ssh_session = self.ssh_session.take();182 out.args(self.into_args());173 out.args(self.into_args());183 out174 out184 }175 }185 }176 }186 pub fn ssh_session(mut self, on: Arc<Session>) -> Self {187 self.ssh_session = Some(on);188 self189 }190 pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {191 let mut out = Self::new("ssh");192 out.ssh_session = self.ssh_session.take();193 out.arg(on).arg("--");194 out.arg(self.into_string());195 out196 }197177198 pub async fn run(self) -> Result<()> {178 pub async fn run(self) -> Result<()> {199 let str = self.clone().into_string();179 let str = self.clone().into_string();278 Ok(())258 Ok(())279}259}280260281pub trait Handler: Send {282 fn handle_line(&mut self, e: &str);283}284285pub struct ClonableHandler<H>(Arc<Mutex<H>>);286impl<H> Clone for ClonableHandler<H> {287 fn clone(&self) -> Self {288 Self(self.0.clone())289 }290}291impl<H> ClonableHandler<H> {292 pub fn new(inner: H) -> Self {293 Self(Arc::new(Mutex::new(inner)))294 }295}296impl<H: Handler> Handler for ClonableHandler<H> {297 fn handle_line(&mut self, e: &str) {298 self.0.lock().unwrap().handle_line(e)299 }300}301302struct PlainHandler;303impl Handler for PlainHandler {304 fn handle_line(&mut self, e: &str) {305 info!(target: "log", "{e}");306 }307}308309pub struct NoopHandler;310impl Handler for NoopHandler {311 fn handle_line(&mut self, _e: &str) {}312}313314#[derive(Default)]315pub struct NixHandler {316 spans: HashMap<u64, Span>,317}318fn process_message(m: &str) -> String {319 static OSC_CLEANER: Lazy<Regex> =320 Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());321 static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());322 let m = OSC_CLEANER.replace_all(m, "");323 // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,324 // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.325 DETABBER.replace_all(m.as_ref(), " ").to_string()326}327impl Handler for NixHandler {328 fn handle_line(&mut self, e: &str) {329 if let Some(e) = e.strip_prefix("@nix ") {330 let log: NixLog = match serde_json::from_str(e) {331 Ok(l) => l,332 Err(err) => {333 warn!("failed to parse nix log line {:?}: {}", e, err);334 return;335 }336 };337 match log {338 NixLog::Msg { msg, raw_msg, .. } => {339 #[allow(clippy::nonminimal_bool)]340 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))341 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")342 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {343 if let Some(raw_msg) = raw_msg {344 if !msg.is_empty() {345 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())346 } else {347 info!(target: "nix", "{}", raw_msg.trim_end())348 }349 } else {350 info!(target: "nix", "{}", msg.trim_end())351 }352 }353 }354 NixLog::Start {355 ref fields,356 typ,357 id,358 ..359 } if typ == 105 && !fields.is_empty() => {360 if let [LogField::String(drv), ..] = &fields[..] {361 let mut drv = drv.as_str();362 if let Some(pkg) = drv.strip_prefix("/nix/store/") {363 let mut it = pkg.splitn(2, '-');364 it.next();365 if let Some(pkg) = it.next() {366 drv = pkg;367 }368 }369 info!(target: "nix","building {}", drv);370 let span = info_span!("build", drv);371 span.pb_start();372 self.spans.insert(id, span);373 } else {374 warn!("bad build log: {:?}", log)375 }376 }377 NixLog::Start {378 ref fields,379 typ,380 id,381 ..382 } if typ == 100 && fields.len() >= 3 => {383 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =384 &fields[..]385 {386 let mut drv = drv.as_str();387388 if let Some(pkg) = drv.strip_prefix("/nix/store/") {389 let mut it = pkg.splitn(2, '-');390 it.next();391 if let Some(pkg) = it.next() {392 drv = pkg;393 }394 }395 // info!(target: "nix","copying {} {} -> {}", drv, from, to);396 let span = info_span!("copy", from, to, drv);397 span.pb_start();398 self.spans.insert(id, span);399 } else {400 warn!("bad copy log: {:?}", log)401 }402 }403 NixLog::Start { text, typ, id, .. }404 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>405 {406 if !text.is_empty()407 && text != "querying info about missing paths"408 && text != "copying 0 paths"409 // Too much spam on lazy-trees branch410 && !(text.starts_with("copying '") && text.ends_with("' to the store"))411 {412 let span = info_span!("job");413 span.pb_start();414 span.pb_set_message(&process_message(text.trim()));415 self.spans.insert(id, span);416 info!(target: "nix", "{}", text);417 }418 }419 NixLog::Start {420 text,421 level: 0,422 typ: 108,423 ..424 } if text.is_empty() => {425 // Cache lookup? Coupled with copy log426 }427 NixLog::Start {428 text,429 level: 4,430 typ: 109,431 ..432 } if text.starts_with("querying info about ") => {433 // Cache lookup434 }435 NixLog::Start {436 text,437 level: 4,438 typ: 101,439 ..440 } if text.starts_with("downloading ") => {441 // NAR downloading, coupled with copy log442 }443 NixLog::Start {444 text,445 level: 1,446 typ: 111,447 ..448 } if text.starts_with("waiting for a machine to build ") => {449 // Useless repeating notification about build450 }451 NixLog::Start {452 text,453 level: 3,454 typ: 111,455 ..456 } if text.starts_with("resolved derivation: ") => {457 // CA resolved458 }459 NixLog::Start {460 text,461 level: 1,462 typ: 111,463 id,464 ..465 } if text.starts_with("waiting for lock on ") => {466 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();467 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {468 drv = txt;469 }470 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {471 drv = txt;472 }473 if let Some(txt) = drv.split("', '").next() {474 drv = txt;475 }476 if let Some(pkg) = drv.strip_prefix("/nix/store/") {477 let mut it = pkg.splitn(2, '-');478 it.next();479 if let Some(pkg) = it.next() {480 drv = pkg;481 }482 }483 let span = info_span!("waiting on drv", drv);484 span.pb_start();485 self.spans.insert(id, span);486 // Concurrent build of the same message487 }488 NixLog::Stop { id, .. } => {489 self.spans.remove(&id);490 }491 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {492 if let Some(span) = self.spans.get(&id) {493 if let LogField::String(s) = &fields[0] {494 span.pb_set_message(&process_message(s.trim()));495 } else {496 warn!("bad fields: {fields:?}");497 }498 } else {499 warn!("unknown result id: {id} {typ} {fields:?}");500 }501 // dbg!(fields, id, typ);502 }503 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {504 if let Some(span) = self.spans.get(&id) {505 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =506 &fields[..4]507 {508 span.pb_set_length(*expected);509 span.pb_set_position(*done);510 } else {511 warn!("bad fields: {fields:?}");512 }513 } else {514 // warn!("unknown result id: {id} {typ} {fields:?}");515 // Unaccounted progress.516 }517 // dbg!(fields, id, typ);518 }519 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {520 // Set phase, expected521 }522 _ => warn!("unknown log: {:?}", log),523 };524 } else {525 let e = e.trim();526 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {527 return;528 }529 info!("{e}")530 }531 }532}533534async fn run_nix_inner_raw(261async fn run_nix_inner_raw(535 str: String,262 str: String,540) -> Result<Option<Vec<u8>>> {267) -> Result<Option<Vec<u8>>> {541 cmd.stderr(Stdio::piped());268 cmd.stderr(Stdio::piped());542 cmd.stdout(Stdio::piped());269 cmd.stdout(Stdio::piped());270 debug!("running command {cmd:?} on local");543 let mut child = cmd.spawn()?;271 let mut child = cmd.spawn()?;544 let mut stderr = child.stderr.take().unwrap();272 let mut stderr = child.stderr.take().unwrap();545 let stdout = child.stdout.take().unwrap();273 let stdout = child.stdout.take().unwrap();600 err_handler: &mut dyn Handler,328 err_handler: &mut dyn Handler,601 mut out_handler: Option<&mut dyn Handler>,329 mut out_handler: Option<&mut dyn Handler>,602) -> Result<Option<Vec<u8>>> {330) -> Result<Option<Vec<u8>>> {331 debug!("running command {cmd:?} over ssh");603 cmd.stderr(openssh::Stdio::piped());332 cmd.stderr(openssh::Stdio::piped());604 cmd.stdout(openssh::Stdio::piped());333 cmd.stdout(openssh::Stdio::piped());605 let mut child = cmd.spawn().await?;334 let mut child = cmd.spawn().await?;656 }385 }657386658 Ok(out_buf)387 Ok(out_buf)659}660661pub trait ErrorRecorder: Send {662 /// Return true to discard message from logging663 fn push_message(&mut self, msg: &str) -> bool;664}665666#[derive(Debug)]667enum LogField {668 String(String),669 Num(u64),670}671672impl<'de> Deserialize<'de> for LogField {673 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>674 where675 D: serde::Deserializer<'de>,676 {677 struct StringOrNum;678 impl<'de> Visitor<'de> for StringOrNum {679 type Value = LogField;680681 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {682 write!(f, "string or unsigned")683 }684685 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>686 where687 E: serde::de::Error,688 {689 Ok(LogField::String(v.to_owned()))690 }691692 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>693 where694 E: serde::de::Error,695 {696 Ok(LogField::Num(v))697 }698 }699700 deserializer.deserialize_any(StringOrNum)701 }702}703704#[derive(Deserialize, Debug)]705#[serde(rename_all = "camelCase", tag = "action")]706#[allow(dead_code)]707enum NixLog {708 Msg {709 level: u32,710 msg: String,711 raw_msg: Option<String>,712 },713 Start {714 id: u64,715 level: u32,716 #[serde(default)]717 fields: Vec<LogField>,718 text: String,719 #[serde(rename = "type")]720 typ: u32,721 },722 Stop {723 id: u64,724 },725 Result {726 id: u64,727 #[serde(rename = "type")]728 typ: u32,729 #[serde(default)]730 fields: Vec<LogField>,731 },732}388}733389cmds/fleet/src/host.rsdiffbeforeafterboth9 sync::{Arc, Mutex, MutexGuard, OnceLock},9 sync::{Arc, Mutex, MutexGuard, OnceLock},10};10};111112use age::Recipient;13use anyhow::{anyhow, bail, Context, Result};12use anyhow::{anyhow, bail, Context, Result};14use clap::{ArgGroup, Parser};13use clap::{ArgGroup, Parser};15use openssh::SessionBuilder;14use openssh::SessionBuilder;504951pub struct ConfigHost {50pub struct ConfigHost {52 pub name: String,51 pub name: String,52 pub local: bool,53 pub session: OnceLock<Arc<openssh::Session>>,53 pub session: OnceLock<Arc<openssh::Session>>,54}54}55impl ConfigHost {55impl ConfigHost {56 pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {56 async fn open_session(&self) -> Result<Arc<openssh::Session>> {57 assert!(!self.local, "do not open ssh connection to local session");57 // FIXME: TOCTOU58 // FIXME: TOCTOU58 if let Some(session) = &self.session.get() {59 if let Some(session) = &self.session.get() {59 return Ok((*session).clone());60 return Ok((*session).clone());96 D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))97 D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))97 }98 }98 pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {99 pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {100 if self.local {101 Ok(MyCommand::new(cmd))102 } else {99 let session = self.open_session().await?;103 let session = self.open_session().await?;100 Ok(MyCommand::new_on(cmd, session))104 Ok(MyCommand::new_on(cmd, session))101 }105 }106 }102107103 pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {108 pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {110 .context("failed to call remote host for decrypt")?;115 .context("failed to call remote host for decrypt")?;111 z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")116 z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")112 }117 }118 pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {119 let mut cmd = self.cmd("fleet-install-secrets").await?;120 cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());121 for target in targets {122 cmd.eqarg("--targets", target);123 }124 let encoded = cmd125 .sudo()126 .run_string()127 .await128 .context("failed to call remote host for decrypt")?;129 SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")130 }113 /// Returns path for futureproofing, as path might change i.e on conversion to CA131 /// Returns path for futureproofing, as path might change i.e on conversion to CA114 pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {132 pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {133 if self.local {134 // Path is located locally, thus already trusted.135 return Ok(path.to_owned());136 }115 let mut nix = MyCommand::new("nix");137 let mut nix = MyCommand::new("nix");116 nix.arg("copy")138 nix.arg("copy")117 .arg("--substitute-on-destination")139 .arg("--substitute-on-destination")120 nix.run_nix().await?;142 nix.run_nix().await?;121 Ok(path.to_owned())143 Ok(path.to_owned())122 }144 }145 pub async fn systemctl_stop(&self, name: &str) -> Result<()> {146 let mut cmd = self.cmd("systemctl").await?;147 cmd.arg("stop").arg(name);148 cmd.sudo().run().await149 }150 pub async fn systemctl_start(&self, name: &str) -> Result<()> {151 let mut cmd = self.cmd("systemctl").await?;152 cmd.arg("start").arg(name);153 cmd.sudo().run().await154 }155156 pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {157 let mut cmd = self.cmd("rm").await?;158 cmd.arg("-f").arg(path);159 if sudo {160 cmd = cmd.sudo()161 }162 cmd.run().await163 }123}164}124165125impl Config {166impl Config {136 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)177 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)137 }178 }138139 pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {140 if sudo {141 command = command.sudo();142 }143 if !self.is_local(host) {144 command = command.ssh(host);145 }146 command.run().await147 }148 pub async fn run_string_on(149 &self,150 host: &str,151 mut command: MyCommand,152 sudo: bool,153 ) -> Result<String> {154 if sudo {155 command = command.sudo();156 }157 if !self.is_local(host) {158 command = command.ssh(host);159 }160 command.run_string().await161 }162179163 pub async fn host(&self, name: &str) -> Result<ConfigHost> {180 pub async fn host(&self, name: &str) -> Result<ConfigHost> {164 Ok(ConfigHost {181 Ok(ConfigHost {165 name: name.to_owned(),182 name: name.to_owned(),183 local: self.is_local(name),166 session: OnceLock::new(),184 session: OnceLock::new(),167 })185 })168 }186 }172 let mut out = vec![];190 let mut out = vec![];173 for name in names {191 for name in names {174 out.push(ConfigHost {192 out.push(ConfigHost {193 local: self.is_local(&name),175 name,194 name,176 session: OnceLock::new(),195 session: OnceLock::new(),177 })196 })227 host_secrets.insert(secret, value);246 host_secrets.insert(secret, value);228 }247 }229230 pub async fn reencrypt_on_host(231 &self,232 host: &str,233 data: SecretData,234 targets: Vec<String>,235 ) -> Result<SecretData> {236 let mut recmd = MyCommand::new("fleet-install-secrets");237 recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());238 for target in targets {239 recmd.eqarg("--targets", target);240 }241 recmd = recmd.sudo().ssh(host);242 let encoded = recmd243 .run_string()244 .await245 .context("failed to call remote host for decrypt")?246 .trim()247 .to_owned();248 SecretData::decode_z85(&encoded)249 }250248251 pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {249 pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {252 let data = self.data();250 let data = self.data();cmds/fleet/src/keys.rsdiffbeforeafterboth1use std::str::FromStr;1use std::str::FromStr;223use crate::command::MyCommand;4use crate::host::Config;3use crate::host::Config;5use age::Recipient;4use age::Recipient;6use anyhow::{anyhow, Result};5use anyhow::{anyhow, Result};30 Ok(key)29 Ok(key)31 } else {30 } else {32 warn!("Loading key for {}", host);31 warn!("Loading key for {}", host);32 let host = self.host(host).await?;33 let mut cmd = MyCommand::new("cat");33 let mut cmd = host.cmd("cat").await?;34 cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");34 cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");35 let key = self.run_string_on(host, cmd, false).await?;35 let key = cmd.run_string().await?;36 self.update_key(host, key.clone());36 self.update_key(&host.name, key.clone());37 Ok(key)37 Ok(key)38 }38 }39 }39 }cmds/remowt-agent/Cargo.tomldiffbeforeafterbothno changes
cmds/remowt-agent/README.adocdiffbeforeafterbothno changes
cmds/remowt-agent/src/main.rsdiffbeforeafterbothno changes
crates/better-command/Cargo.tomldiffbeforeafterbothno changes
crates/better-command/src/handler.rsdiffbeforeafterbothno changes
crates/better-command/src/lib.rsdiffbeforeafterbothno changes