From 7c6930a6bff04f23262f02c5f19e32364716b64b Mon Sep 17 00:00:00 2001 From: Yaroslav Bolyukin Date: Fri, 29 Dec 2023 00:39:42 +0000 Subject: [PATCH] refactor: remove shell-outs for ssh --- --- a/Cargo.lock +++ b/Cargo.lock @@ -319,6 +319,18 @@ checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445" [[package]] +name = "better-command" +version = "0.1.0" +dependencies = [ + "once_cell", + "regex", + "serde", + "serde_json", + "tracing", + "tracing-indicatif", +] + +[[package]] name = "bitflags" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -748,6 +760,7 @@ "anyhow", "async-trait", "base64 0.21.5", + "better-command", "chrono", "clap", "futures", @@ -1510,9 +1523,9 @@ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "opaque-debug" @@ -1922,6 +1935,10 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] +name = "remowt-agent" +version = "0.1.0" + +[[package]] name = "rnix" version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2105,9 +2122,9 @@ [[package]] name = "serde" -version = "1.0.190" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" dependencies = [ "serde_derive", ] @@ -2123,9 +2140,9 @@ [[package]] name = "serde_derive" -version = "1.0.190" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", @@ -2134,9 +2151,9 @@ [[package]] name = "serde_json" -version = "1.0.107" +version = "1.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" dependencies = [ "itoa", "ryu", @@ -2527,11 +2544,10 @@ [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "cfg-if", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2539,9 +2555,9 @@ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", @@ -2550,9 +2566,9 @@ [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", @@ -2560,9 +2576,9 @@ [[package]] name = "tracing-indicatif" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8" +checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d" dependencies = [ "indicatif", "tracing", --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,7 @@ [workspace] members = ["crates/*", "cmds/*"] resolver = "2" + +[workspace.dependencies] +nixlike = { path = "./crates/nixlike" } +better-command = { path = "./crates/better-command" } --- a/cmds/fleet/Cargo.toml +++ b/cmds/fleet/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] +nixlike.workspace = true +better-command.workspace = true anyhow = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -15,7 +17,6 @@ hostname = "0.3.1" age-core = "0.9.0" peg = "0.8.2" -nixlike = { path = "../../crates/nixlike" } age = { version = "0.9.2", features = ["ssh", "armor"] } base64 = "0.21.5" chrono = { version = "0.4.31", features = ["serde"] } --- a/cmds/fleet/src/better_nix_eval.rs +++ b/cmds/fleet/src/better_nix_eval.rs @@ -1,3 +1,6 @@ +//! Wrapper around nix repl, which allows to work on nix code, without relying on +//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA. + use std::collections::HashMap; use std::ffi::{OsStr, OsString}; use std::fmt::{self, Display}; @@ -6,6 +9,7 @@ use std::sync::{Arc, OnceLock}; use anyhow::{anyhow, bail, ensure, Context, Result}; +use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler}; use futures::StreamExt; use itertools::Itertools; use r2d2::{Pool, PooledConnection}; @@ -14,11 +18,9 @@ use tokio::io::AsyncWriteExt; use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command}; use tokio::select; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, Mutex}; use tokio_util::codec::{FramedRead, LinesCodec}; use tracing::{debug, error, warn, Level}; - -use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler}; const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\""; @@ -30,6 +32,8 @@ string_wrapping: (String, String), number_wrapping: (String, String), + executing_command: Arc>, + next_id: u32, free_list: Vec, } @@ -219,6 +223,8 @@ string_wrapping: Default::default(), number_wrapping: Default::default(), + executing_command: Arc::new(Mutex::new(())), + next_id: 0, free_list: vec![], }; @@ -331,6 +337,10 @@ expr: impl AsRef<[u8]>, err_handler: &mut dyn Handler, ) -> Result { + // Prevent two commands from being executed in parallel, messing with each other. + let _lock = self.executing_command.clone(); + let _guard = _lock.lock().await; + self.send_command(expr).await?; // It will be echoed self.send_command(REPL_DELIMITER).await?; --- a/cmds/fleet/src/cmds/build_systems.rs +++ b/cmds/fleet/src/cmds/build_systems.rs @@ -3,11 +3,11 @@ use std::{env::current_dir, time::Duration}; use crate::command::MyCommand; -use crate::host::Config; +use crate::host::{Config, ConfigHost}; use crate::nix_go; use anyhow::{anyhow, Result}; use clap::Parser; -use itertools::Itertools; +use itertools::Itertools as _; use tokio::{task::LocalSet, time::sleep}; use tracing::{error, field, info, info_span, warn, Instrument}; @@ -112,12 +112,12 @@ current: bool, datetime: String, } -async fn get_current_generation(config: &Config, host: &str) -> Result { - let mut cmd = MyCommand::new("nix-env"); +async fn get_current_generation(host: &ConfigHost) -> Result { + let mut cmd = host.cmd("nix-env").await?; cmd.comparg("--profile", "/nix/var/nix/profiles/system") .arg("--list-generations"); // Sudo is required due to --list-generations acquiring lock on the profile. - let data = config.run_string_on(host, cmd, true).await?; + let data = cmd.sudo().run_string().await?; let generations = data .split('\n') .map(|e| e.trim()) @@ -161,25 +161,12 @@ .map_err(|_e| anyhow!("bad list-generations output"))? .ok_or_else(|| anyhow!("failed to find generation"))?; Ok(current) -} - -async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> { - let mut cmd = MyCommand::new("systemctl"); - cmd.arg("stop").arg(unit); - config.run_on(host, cmd, true).await -} - -async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> { - let mut cmd = MyCommand::new("systemctl"); - cmd.arg("start").arg(unit); - config.run_on(host, cmd, true).await } async fn execute_upload( build: &BuildSystems, - config: &Config, action: UploadAction, - host: &str, + host: &ConfigHost, built: PathBuf, ) -> Result<()> { let mut failed = false; @@ -191,15 +178,15 @@ if !build.disable_rollback { let _span = info_span!("preparing").entered(); info!("preparing for rollback"); - let generation = get_current_generation(config, host).await?; + let generation = get_current_generation(&host).await?; info!( "rollback target would be {} {}", generation.id, generation.datetime ); { - let mut cmd = MyCommand::new("sh"); + let mut cmd = host.cmd("sh").await?; cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id)); - if let Err(e) = config.run_on(host, cmd, true).await { + if let Err(e) = cmd.sudo().run().await { error!("failed to set rollback marker: {e}"); failed = true; } @@ -215,37 +202,41 @@ // if we fail to perform generation switch in time, then we will still call the activation script, and this may break something. // Anyway, reboot will still help in this case. if action.should_schedule_rollback_run() { - let mut cmd = MyCommand::new("systemd-run"); + let mut cmd = host.cmd("systemd-run").await?; cmd.comparg("--on-active", "3min") .comparg("--unit", "rollback-watchdog-run") .arg("systemctl") .arg("start") .arg("rollback-watchdog.service"); - if let Err(e) = config.run_on(host, cmd, true).await { + if let Err(e) = cmd.sudo().run().await { error!("failed to schedule rollback run: {e}"); failed = true; } } } + if action.should_switch_profile() && !failed { info!("switching generation"); - let mut cmd = MyCommand::new("nix-env"); + let mut cmd = host.cmd("nix-env").await?; cmd.comparg("--profile", "/nix/var/nix/profiles/system") .comparg("--set", &built); - if let Err(e) = config.run_on(host, cmd, true).await { + if let Err(e) = cmd.sudo().run().await { error!("failed to switch generation: {e}"); failed = true; } } + + // FIXME: Connection might be disconnected after activation run + if action.should_activate() && !failed { let _span = info_span!("activating").entered(); info!("executing activation script"); let mut switch_script = built.clone(); switch_script.push("bin"); switch_script.push("switch-to-configuration"); - let mut cmd = MyCommand::new(switch_script); + let mut cmd = host.cmd(switch_script).await?; cmd.arg(action.name()); - if let Err(e) = config.run_on(host, cmd, true).in_current_span().await { + if let Err(e) = cmd.sudo().run().in_current_span().await { error!("failed to activate: {e}"); failed = true; } @@ -253,7 +244,8 @@ if !build.disable_rollback { if failed { info!("executing rollback"); - if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service") + if let Err(e) = host + .systemctl_start("rollback-watchdog.service") .instrument(info_span!("rollback")) .await { @@ -261,27 +253,29 @@ } } else { info!("trying to mark upgrade as successful"); - let mut cmd = MyCommand::new("rm"); - cmd.arg("-f").arg("/etc/fleet_rollback_marker"); - if let Err(e) = config.run_on(host, cmd, true).in_current_span().await { + if let Err(e) = host + .rm_file("/etc/fleet_rollback_marker", true) + .in_current_span() + .await + { error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}") } } info!("disarming watchdog, just in case"); - if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await { + if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await { // It is ok, if there was no reboot - then timer might not be running. } if action.should_schedule_rollback_run() { - if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await { + if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await { error!("failed to disarm rollback run: {e}"); } } - } else { - let mut cmd = MyCommand::new("rm"); - cmd.arg("-f").arg("/etc/fleet_rollback_marker"); - if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await { - // Marker might not exist, yet better try to remove it. - } + } else if let Err(_e) = host + .rm_file("/etc/fleet_rollback_marker", true) + .in_current_span() + .await + { + // Marker might not exist, yet better try to remove it. } Ok(()) } @@ -289,12 +283,13 @@ impl BuildSystems { async fn build_task(self, config: Config, host: String) -> Result<()> { info!("building"); + let host = config.host(&host).await?; let action = Action::from(self.subcommand.clone()); let fleet_field = &config.fleet_field; let drv = nix_go!( fleet_field.buildSystems(Obj { localSystem: { config.local_system.clone() } - })[{ action.build_attr() }][{ host }] + })[{ action.build_attr() }][{ &host.name }] ); let outputs = drv.build().await.map_err(|e| { if action.build_attr() == "sdImage" { @@ -309,9 +304,10 @@ match action { Action::Upload { action } => { - if !config.is_local(&host) { + if !config.is_local(&host.name) { info!("uploading system closure"); { + // TODO: Move to remote_derivation method. // Alternatively, nix store make-content-addressed can be used, // at least for the first deployment, to provide trusted store key. // @@ -329,13 +325,11 @@ } let mut tries = 0; loop { - let mut nix = MyCommand::new("nix"); - nix.arg("copy") - .arg("--substitute-on-destination") - .comparg("--to", format!("ssh-ng://{host}")) - .arg(out_output); - match nix.run_nix().await { - Ok(()) => break, + match host.remote_derivation(out_output).await { + Ok(remote) => { + assert!(&remote == out_output, "CA derivations aren't implemented"); + break; + } Err(e) if tries < 3 => { tries += 1; warn!("Copy failure ({}/3): {}", tries, e); @@ -346,19 +340,19 @@ } } if let Some(action) = action { - execute_upload(&self, &config, action, &host, out_output.clone()).await? + execute_upload(&self, action, &host, out_output.clone()).await? } } Action::Package(PackageAction::SdImage) => { let mut out = current_dir()?; - out.push(format!("sd-image-{}", host)); + out.push(format!("sd-image-{}", host.name)); info!("linking sd image to {:?}", out); symlink(out_output, out)?; } Action::Package(PackageAction::InstallationCd) => { let mut out = current_dir()?; - out.push(format!("installation-cd-{}", host)); + out.push(format!("installation-cd-{}", host.name)); info!("linking iso image to {:?}", out); symlink(out_output, out)?; @@ -379,6 +373,17 @@ let this = this.clone(); let span = info_span!("deployment", host = field::display(&host.name)); let hostname = host.name; + // FIXME: Since the introduction of better-nix-eval, + // due to single repl used for builds, hosts are waiting for each other to build, + // instead of building concurrently. + // + // Open multiple repls? + // + // Create build batcher, which will behave similar to golangs + // WaitGroup, and start executing once all the build tasks are scheduled? + // This also allows to cleanup build output, as there will be no longer + // "waiting for remote machine" messages in the cases when one package is needed for + // multiple hosts. set.spawn_local( (async move { match this.build_task(config, hostname).await { --- a/cmds/fleet/src/cmds/secrets/mod.rs +++ b/cmds/fleet/src/cmds/secrets/mod.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result}; use chrono::{DateTime, Utc}; use clap::Parser; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use itertools::Itertools; use owo_colors::OwoColorize; use std::{ @@ -404,9 +404,8 @@ target_recipients.into_iter().collect::>>()?; if let Some(data) = secret.secret.secret { - let encrypted = config - .reencrypt_on_host(identity_holder, data, target_recipients) - .await?; + let host = config.host(&identity_holder).await?; + let encrypted = host.reencrypt(data, target_recipients).await?; secret.secret.secret = Some(encrypted); } @@ -481,9 +480,8 @@ target_recipients.into_iter().collect::>>()?; if let Some(secret) = data.secret.secret { - let encrypted = config - .reencrypt_on_host(identity_holder, secret, target_recipients) - .await?; + let host = config.host(identity_holder).await?; + let encrypted = host.reencrypt(secret, target_recipients).await?; data.secret.secret = Some(encrypted); } --- a/cmds/fleet/src/command.rs +++ b/cmds/fleet/src/command.rs @@ -1,23 +1,15 @@ -use std::{ - collections::HashMap, - ffi::OsStr, - pin, - process::Stdio, - sync::{Arc, Mutex}, - task::Poll, -}; +use std::thread::sleep; +use std::time::Duration; +use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll}; use anyhow::{anyhow, Result}; +use better_command::{Handler, NixHandler, PlainHandler}; use futures::StreamExt; use itertools::Either; -use once_cell::sync::Lazy; use openssh::{OverSsh, OwningCommand, Session}; -use regex::Regex; -use serde::{de::Visitor, Deserialize}; use tokio::{io::AsyncRead, process::Command, select}; use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec}; -use tracing::{info, info_span, warn, Span}; -use tracing_indicatif::span_ext::IndicatifSpanExt; +use tracing::{info, debug}; fn escape_bash(input: &str, out: &mut String) { const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}"; @@ -87,9 +79,7 @@ return self; } let mut out = Self::new("env"); - if let Some(session) = self.ssh_session { - out = out.ssh_session(session); - } + out.ssh_session = self.ssh_session; for (k, v) in self.env { assert!(!k.contains('=')); out.arg(format!("{k}={v}")); @@ -179,21 +169,11 @@ out } else { let mut out = Self::new("sudo"); + out.ssh_session = self.ssh_session.take(); out.args(self.into_args()); out } } - pub fn ssh_session(mut self, on: Arc) -> Self { - self.ssh_session = Some(on); - self - } - pub fn ssh(mut self, on: impl AsRef) -> Self { - let mut out = Self::new("ssh"); - out.ssh_session = self.ssh_session.take(); - out.arg(on).arg("--"); - out.arg(self.into_string()); - out - } pub async fn run(self) -> Result<()> { let str = self.clone().into_string(); @@ -276,259 +256,6 @@ let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?; assert!(v.is_none()); Ok(()) -} - -pub trait Handler: Send { - fn handle_line(&mut self, e: &str); -} - -pub struct ClonableHandler(Arc>); -impl Clone for ClonableHandler { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} -impl ClonableHandler { - pub fn new(inner: H) -> Self { - Self(Arc::new(Mutex::new(inner))) - } -} -impl Handler for ClonableHandler { - fn handle_line(&mut self, e: &str) { - self.0.lock().unwrap().handle_line(e) - } -} - -struct PlainHandler; -impl Handler for PlainHandler { - fn handle_line(&mut self, e: &str) { - info!(target: "log", "{e}"); - } -} - -pub struct NoopHandler; -impl Handler for NoopHandler { - fn handle_line(&mut self, _e: &str) {} -} - -#[derive(Default)] -pub struct NixHandler { - spans: HashMap, -} -fn process_message(m: &str) -> String { - static OSC_CLEANER: Lazy = - Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap()); - static DETABBER: Lazy = Lazy::new(|| Regex::new(r"\t").unwrap()); - let m = OSC_CLEANER.replace_all(m, ""); - // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned, - // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line. - DETABBER.replace_all(m.as_ref(), " ").to_string() -} -impl Handler for NixHandler { - fn handle_line(&mut self, e: &str) { - if let Some(e) = e.strip_prefix("@nix ") { - let log: NixLog = match serde_json::from_str(e) { - Ok(l) => l, - Err(err) => { - warn!("failed to parse nix log line {:?}: {}", e, err); - return; - } - }; - match log { - NixLog::Msg { msg, raw_msg, .. } => { - #[allow(clippy::nonminimal_bool)] - if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty")) - && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake") - && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" { - if let Some(raw_msg) = raw_msg { - if !msg.is_empty() { - info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end()) - } else { - info!(target: "nix", "{}", raw_msg.trim_end()) - } - } else { - info!(target: "nix", "{}", msg.trim_end()) - } - } - } - NixLog::Start { - ref fields, - typ, - id, - .. - } if typ == 105 && !fields.is_empty() => { - if let [LogField::String(drv), ..] = &fields[..] { - let mut drv = drv.as_str(); - if let Some(pkg) = drv.strip_prefix("/nix/store/") { - let mut it = pkg.splitn(2, '-'); - it.next(); - if let Some(pkg) = it.next() { - drv = pkg; - } - } - info!(target: "nix","building {}", drv); - let span = info_span!("build", drv); - span.pb_start(); - self.spans.insert(id, span); - } else { - warn!("bad build log: {:?}", log) - } - } - NixLog::Start { - ref fields, - typ, - id, - .. - } if typ == 100 && fields.len() >= 3 => { - if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = - &fields[..] - { - let mut drv = drv.as_str(); - - if let Some(pkg) = drv.strip_prefix("/nix/store/") { - let mut it = pkg.splitn(2, '-'); - it.next(); - if let Some(pkg) = it.next() { - drv = pkg; - } - } - // info!(target: "nix","copying {} {} -> {}", drv, from, to); - let span = info_span!("copy", from, to, drv); - span.pb_start(); - self.spans.insert(id, span); - } else { - warn!("bad copy log: {:?}", log) - } - } - NixLog::Start { text, typ, id, .. } - if typ == 0 || typ == 102 || typ == 103 || typ == 104 => - { - if !text.is_empty() - && text != "querying info about missing paths" - && text != "copying 0 paths" - // Too much spam on lazy-trees branch - && !(text.starts_with("copying '") && text.ends_with("' to the store")) - { - let span = info_span!("job"); - span.pb_start(); - span.pb_set_message(&process_message(text.trim())); - self.spans.insert(id, span); - info!(target: "nix", "{}", text); - } - } - NixLog::Start { - text, - level: 0, - typ: 108, - .. - } if text.is_empty() => { - // Cache lookup? Coupled with copy log - } - NixLog::Start { - text, - level: 4, - typ: 109, - .. - } if text.starts_with("querying info about ") => { - // Cache lookup - } - NixLog::Start { - text, - level: 4, - typ: 101, - .. - } if text.starts_with("downloading ") => { - // NAR downloading, coupled with copy log - } - NixLog::Start { - text, - level: 1, - typ: 111, - .. - } if text.starts_with("waiting for a machine to build ") => { - // Useless repeating notification about build - } - NixLog::Start { - text, - level: 3, - typ: 111, - .. - } if text.starts_with("resolved derivation: ") => { - // CA resolved - } - NixLog::Start { - text, - level: 1, - typ: 111, - id, - .. - } if text.starts_with("waiting for lock on ") => { - let mut drv = text.strip_prefix("waiting for lock on ").unwrap(); - if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") { - drv = txt; - } - if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") { - drv = txt; - } - if let Some(txt) = drv.split("', '").next() { - drv = txt; - } - if let Some(pkg) = drv.strip_prefix("/nix/store/") { - let mut it = pkg.splitn(2, '-'); - it.next(); - if let Some(pkg) = it.next() { - drv = pkg; - } - } - let span = info_span!("waiting on drv", drv); - span.pb_start(); - self.spans.insert(id, span); - // Concurrent build of the same message - } - NixLog::Stop { id, .. } => { - self.spans.remove(&id); - } - NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => { - if let Some(span) = self.spans.get(&id) { - if let LogField::String(s) = &fields[0] { - span.pb_set_message(&process_message(s.trim())); - } else { - warn!("bad fields: {fields:?}"); - } - } else { - warn!("unknown result id: {id} {typ} {fields:?}"); - } - // dbg!(fields, id, typ); - } - NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => { - if let Some(span) = self.spans.get(&id) { - if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] = - &fields[..4] - { - span.pb_set_length(*expected); - span.pb_set_position(*done); - } else { - warn!("bad fields: {fields:?}"); - } - } else { - // warn!("unknown result id: {id} {typ} {fields:?}"); - // Unaccounted progress. - } - // dbg!(fields, id, typ); - } - NixLog::Result { typ, .. } if typ == 104 || typ == 106 => { - // Set phase, expected - } - _ => warn!("unknown log: {:?}", log), - }; - } else { - let e = e.trim(); - if e.starts_with("Failed tcsetattr(TCSADRAIN): ") { - return; - } - info!("{e}") - } - } } async fn run_nix_inner_raw( @@ -540,6 +267,7 @@ ) -> Result>> { cmd.stderr(Stdio::piped()); cmd.stdout(Stdio::piped()); + debug!("running command {cmd:?} on local"); let mut child = cmd.spawn()?; let mut stderr = child.stderr.take().unwrap(); let stdout = child.stdout.take().unwrap(); @@ -600,6 +328,7 @@ err_handler: &mut dyn Handler, mut out_handler: Option<&mut dyn Handler>, ) -> Result>> { + debug!("running command {cmd:?} over ssh"); cmd.stderr(openssh::Stdio::piped()); cmd.stdout(openssh::Stdio::piped()); let mut child = cmd.spawn().await?; @@ -656,77 +385,4 @@ } Ok(out_buf) -} - -pub trait ErrorRecorder: Send { - /// Return true to discard message from logging - fn push_message(&mut self, msg: &str) -> bool; -} - -#[derive(Debug)] -enum LogField { - String(String), - Num(u64), -} - -impl<'de> Deserialize<'de> for LogField { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - struct StringOrNum; - impl<'de> Visitor<'de> for StringOrNum { - type Value = LogField; - - fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "string or unsigned") - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - Ok(LogField::String(v.to_owned())) - } - - fn visit_u64(self, v: u64) -> Result - where - E: serde::de::Error, - { - Ok(LogField::Num(v)) - } - } - - deserializer.deserialize_any(StringOrNum) - } -} - -#[derive(Deserialize, Debug)] -#[serde(rename_all = "camelCase", tag = "action")] -#[allow(dead_code)] -enum NixLog { - Msg { - level: u32, - msg: String, - raw_msg: Option, - }, - Start { - id: u64, - level: u32, - #[serde(default)] - fields: Vec, - text: String, - #[serde(rename = "type")] - typ: u32, - }, - Stop { - id: u64, - }, - Result { - id: u64, - #[serde(rename = "type")] - typ: u32, - #[serde(default)] - fields: Vec, - }, } --- a/cmds/fleet/src/host.rs +++ b/cmds/fleet/src/host.rs @@ -9,7 +9,6 @@ sync::{Arc, Mutex, MutexGuard, OnceLock}, }; -use age::Recipient; use anyhow::{anyhow, bail, Context, Result}; use clap::{ArgGroup, Parser}; use openssh::SessionBuilder; @@ -50,10 +49,12 @@ pub struct ConfigHost { pub name: String, + pub local: bool, pub session: OnceLock>, } impl ConfigHost { - pub async fn open_session(&self) -> Result> { + async fn open_session(&self) -> Result> { + assert!(!self.local, "do not open ssh connection to local session"); // FIXME: TOCTOU if let Some(session) = &self.session.get() { return Ok((*session).clone()); @@ -96,8 +97,12 @@ D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}")) } pub async fn cmd(&self, cmd: impl AsRef) -> Result { - let session = self.open_session().await?; - Ok(MyCommand::new_on(cmd, session)) + if self.local { + Ok(MyCommand::new(cmd)) + } else { + let session = self.open_session().await?; + Ok(MyCommand::new_on(cmd, session)) + } } pub async fn decrypt(&self, data: SecretData) -> Result> { @@ -110,8 +115,25 @@ .context("failed to call remote host for decrypt")?; z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?") } + pub async fn reencrypt(&self, data: SecretData, targets: Vec) -> Result { + let mut cmd = self.cmd("fleet-install-secrets").await?; + cmd.arg("reencrypt").eqarg("--secret", data.encode_z85()); + for target in targets { + cmd.eqarg("--targets", target); + } + let encoded = cmd + .sudo() + .run_string() + .await + .context("failed to call remote host for decrypt")?; + SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?") + } /// Returns path for futureproofing, as path might change i.e on conversion to CA pub async fn remote_derivation(&self, path: &PathBuf) -> Result { + if self.local { + // Path is located locally, thus already trusted. + return Ok(path.to_owned()); + } let mut nix = MyCommand::new("nix"); nix.arg("copy") .arg("--substitute-on-destination") @@ -120,6 +142,25 @@ nix.run_nix().await?; Ok(path.to_owned()) } + pub async fn systemctl_stop(&self, name: &str) -> Result<()> { + let mut cmd = self.cmd("systemctl").await?; + cmd.arg("stop").arg(name); + cmd.sudo().run().await + } + pub async fn systemctl_start(&self, name: &str) -> Result<()> { + let mut cmd = self.cmd("systemctl").await?; + cmd.arg("start").arg(name); + cmd.sudo().run().await + } + + pub async fn rm_file(&self, path: impl AsRef, sudo: bool) -> Result<()> { + let mut cmd = self.cmd("rm").await?; + cmd.arg("-f").arg(path); + if sudo { + cmd = cmd.sudo() + } + cmd.run().await + } } impl Config { @@ -134,35 +175,12 @@ } pub fn is_local(&self, host: &str) -> bool { self.opts.localhost.as_ref().map(|s| s as &str) == Some(host) - } - - pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> { - if sudo { - command = command.sudo(); - } - if !self.is_local(host) { - command = command.ssh(host); - } - command.run().await - } - pub async fn run_string_on( - &self, - host: &str, - mut command: MyCommand, - sudo: bool, - ) -> Result { - if sudo { - command = command.sudo(); - } - if !self.is_local(host) { - command = command.ssh(host); - } - command.run_string().await } pub async fn host(&self, name: &str) -> Result { Ok(ConfigHost { name: name.to_owned(), + local: self.is_local(name), session: OnceLock::new(), }) } @@ -172,6 +190,7 @@ let mut out = vec![]; for name in names { out.push(ConfigHost { + local: self.is_local(&name), name, session: OnceLock::new(), }) @@ -225,27 +244,6 @@ let mut data = self.data_mut(); let host_secrets = data.host_secrets.entry(host.to_owned()).or_default(); host_secrets.insert(secret, value); - } - - pub async fn reencrypt_on_host( - &self, - host: &str, - data: SecretData, - targets: Vec, - ) -> Result { - let mut recmd = MyCommand::new("fleet-install-secrets"); - recmd.arg("reencrypt").eqarg("--secret", data.encode_z85()); - for target in targets { - recmd.eqarg("--targets", target); - } - recmd = recmd.sudo().ssh(host); - let encoded = recmd - .run_string() - .await - .context("failed to call remote host for decrypt")? - .trim() - .to_owned(); - SecretData::decode_z85(&encoded) } pub fn host_secret(&self, host: &str, secret: &str) -> Result { --- a/cmds/fleet/src/keys.rs +++ b/cmds/fleet/src/keys.rs @@ -1,6 +1,5 @@ use std::str::FromStr; -use crate::command::MyCommand; use crate::host::Config; use age::Recipient; use anyhow::{anyhow, Result}; @@ -30,10 +29,11 @@ Ok(key) } else { warn!("Loading key for {}", host); - let mut cmd = MyCommand::new("cat"); + let host = self.host(host).await?; + let mut cmd = host.cmd("cat").await?; cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub"); - let key = self.run_string_on(host, cmd, false).await?; - self.update_key(host, key.clone()); + let key = cmd.run_string().await?; + self.update_key(&host.name, key.clone()); Ok(key) } } --- /dev/null +++ b/cmds/remowt-agent/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "remowt-agent" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] --- /dev/null +++ b/cmds/remowt-agent/README.adoc @@ -0,0 +1,16 @@ += Remowt agent + +Working with remote machine programmatically is not always easy. + +Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers? + +Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart? + +What if remote host has not enough tools to implement the functionality you need? + +Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work. + +== Non-targets + +Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size. +Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be. --- /dev/null +++ b/cmds/remowt-agent/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} --- /dev/null +++ b/crates/better-command/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "better-command" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +once_cell = "1.19.0" +regex = "1.10.2" +serde = { version = "1.0.193", features = ["derive"] } +serde_json = "1.0.108" +tracing = "0.1.40" +tracing-indicatif = "0.3.6" --- /dev/null +++ b/crates/better-command/src/handler.rs @@ -0,0 +1,305 @@ +//! Collection of handlers, which transform program-specific stdout format to tracing + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use once_cell::sync::Lazy; +use regex::Regex; +use serde::Deserialize; +use tracing::{Span, info, warn, info_span}; +use tracing_indicatif::span_ext::IndicatifSpanExt as _; + +pub trait Handler: Send { + fn handle_line(&mut self, e: &str); +} + +/// Handler wrapper, which can be cloned. +pub struct ClonableHandler(Arc>); +impl Clone for ClonableHandler { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} +impl ClonableHandler { + pub fn new(inner: H) -> Self { + Self(Arc::new(Mutex::new(inner))) + } +} +impl Handler for ClonableHandler { + fn handle_line(&mut self, e: &str) { + self.0.lock().unwrap().handle_line(e) + } +} + +/// Converts command output to tracing lines +pub struct PlainHandler; +impl Handler for PlainHandler { + fn handle_line(&mut self, e: &str) { + info!(target: "log", "{e}"); + } +} + +/// Ignores output +pub struct NoopHandler; +impl Handler for NoopHandler { + fn handle_line(&mut self, _e: &str) {} +} + +/// Transform nix internal-json logs to tracing spans. +#[derive(Default)] +pub struct NixHandler { + spans: HashMap, +} +#[derive(Deserialize, Debug)] +#[serde(untagged)] +enum LogField { + String(String), + Num(u64), +} + +/// Nix internal-json log line type +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase", tag = "action")] +#[allow(dead_code)] +enum NixLog { + Msg { + level: u32, + msg: String, + raw_msg: Option, + }, + Start { + id: u64, + level: u32, + #[serde(default)] + fields: Vec, + text: String, + #[serde(rename = "type")] + typ: u32, + }, + Stop { + id: u64, + }, + Result { + id: u64, + #[serde(rename = "type")] + typ: u32, + #[serde(default)] + fields: Vec, + }, +} +fn process_message(m: &str) -> String { + // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc. + static OSC_CLEANER: Lazy = + Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap()); + static DETABBER: Lazy = Lazy::new(|| Regex::new(r"\t").unwrap()); + let m = OSC_CLEANER.replace_all(m, ""); + // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned, + // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line. + DETABBER.replace_all(m.as_ref(), " ").to_string() +} +impl Handler for NixHandler { + fn handle_line(&mut self, e: &str) { + if let Some(e) = e.strip_prefix("@nix ") { + let log: NixLog = match serde_json::from_str(e) { + Ok(l) => l, + Err(err) => { + warn!("failed to parse nix log line {:?}: {}", e, err); + return; + } + }; + match log { + NixLog::Msg { msg, raw_msg, .. } => { + #[allow(clippy::nonminimal_bool)] + if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty")) + && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake") + && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" { + if let Some(raw_msg) = raw_msg { + if !msg.is_empty() { + info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end()) + } else { + info!(target: "nix", "{}", raw_msg.trim_end()) + } + } else { + info!(target: "nix", "{}", msg.trim_end()) + } + } + } + NixLog::Start { + ref fields, + typ, + id, + .. + } if typ == 105 && !fields.is_empty() => { + if let [LogField::String(drv), ..] = &fields[..] { + let mut drv = drv.as_str(); + if let Some(pkg) = drv.strip_prefix("/nix/store/") { + let mut it = pkg.splitn(2, '-'); + it.next(); + if let Some(pkg) = it.next() { + drv = pkg; + } + } + info!(target: "nix","building {}", drv); + let span = info_span!("build", drv); + span.pb_start(); + self.spans.insert(id, span); + } else { + warn!("bad build log: {:?}", log) + } + } + NixLog::Start { + ref fields, + typ, + id, + .. + } if typ == 100 && fields.len() >= 3 => { + if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = + &fields[..] + { + let mut drv = drv.as_str(); + + if let Some(pkg) = drv.strip_prefix("/nix/store/") { + let mut it = pkg.splitn(2, '-'); + it.next(); + if let Some(pkg) = it.next() { + drv = pkg; + } + } + // info!(target: "nix","copying {} {} -> {}", drv, from, to); + let span = info_span!("copy", from, to, drv); + span.pb_start(); + self.spans.insert(id, span); + } else { + warn!("bad copy log: {:?}", log) + } + } + NixLog::Start { text, typ, id, .. } + if typ == 0 || typ == 102 || typ == 103 || typ == 104 => + { + if !text.is_empty() + && text != "querying info about missing paths" + && text != "copying 0 paths" + // Too much spam on lazy-trees branch + && !(text.starts_with("copying '") && text.ends_with("' to the store")) + { + let span = info_span!("job"); + span.pb_start(); + span.pb_set_message(&process_message(text.trim())); + self.spans.insert(id, span); + info!(target: "nix", "{}", text); + } + } + NixLog::Start { + text, + level: 0, + typ: 108, + .. + } if text.is_empty() => { + // Cache lookup? Coupled with copy log + } + NixLog::Start { + text, + level: 4, + typ: 109, + .. + } if text.starts_with("querying info about ") => { + // Cache lookup + } + NixLog::Start { + text, + level: 4, + typ: 101, + .. + } if text.starts_with("downloading ") => { + // NAR downloading, coupled with copy log + } + NixLog::Start { + text, + level: 1, + typ: 111, + .. + } if text.starts_with("waiting for a machine to build ") => { + // Useless repeating notification about build + } + NixLog::Start { + text, + level: 3, + typ: 111, + .. + } if text.starts_with("resolved derivation: ") => { + // CA resolved + } + NixLog::Start { + text, + level: 1, + typ: 111, + id, + .. + } if text.starts_with("waiting for lock on ") => { + let mut drv = text.strip_prefix("waiting for lock on ").unwrap(); + if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") { + drv = txt; + } + if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") { + drv = txt; + } + if let Some(txt) = drv.split("', '").next() { + drv = txt; + } + if let Some(pkg) = drv.strip_prefix("/nix/store/") { + let mut it = pkg.splitn(2, '-'); + it.next(); + if let Some(pkg) = it.next() { + drv = pkg; + } + } + let span = info_span!("waiting on drv", drv); + span.pb_start(); + self.spans.insert(id, span); + // Concurrent build of the same message + } + NixLog::Stop { id, .. } => { + self.spans.remove(&id); + } + NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => { + if let Some(span) = self.spans.get(&id) { + if let LogField::String(s) = &fields[0] { + span.pb_set_message(&process_message(s.trim())); + } else { + warn!("bad fields: {fields:?}"); + } + } else { + warn!("unknown result id: {id} {typ} {fields:?}"); + } + // dbg!(fields, id, typ); + } + NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => { + if let Some(span) = self.spans.get(&id) { + if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] = + &fields[..4] + { + span.pb_set_length(*expected); + span.pb_set_position(*done); + } else { + warn!("bad fields: {fields:?}"); + } + } else { + // warn!("unknown result id: {id} {typ} {fields:?}"); + // Unaccounted progress. + } + // dbg!(fields, id, typ); + } + NixLog::Result { typ, .. } if typ == 104 || typ == 106 => { + // Set phase, expected + } + _ => warn!("unknown log: {:?}", log), + }; + } else { + let e = e.trim(); + if e.starts_with("Failed tcsetattr(TCSADRAIN): ") { + return; + } + info!("{e}") + } + } +} --- /dev/null +++ b/crates/better-command/src/lib.rs @@ -0,0 +1,17 @@ +mod handler; +pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler}; + +pub fn add(left: usize, right: usize) -> usize { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +} -- gitstuff