git.delta.rocks / jrsonnet / refs/commits / 7c6930a6bff0

difftreelog

refactor remove shell-outs for ssh

Yaroslav Bolyukin2023-12-29parent: #904d121.patch.diff
in: trunk

15 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
 checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
 
 [[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
 name = "bitflags"
 version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
  "anyhow",
  "async-trait",
  "base64 0.21.5",
+ "better-command",
  "chrono",
  "clap",
  "futures",
@@ -1510,9 +1523,9 @@
 
 [[package]]
 name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
 
 [[package]]
 name = "opaque-debug"
@@ -1922,6 +1935,10 @@
 checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
 [[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
 name = "rnix"
 version = "0.10.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
 
 [[package]]
 name = "serde"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
 dependencies = [
  "serde_derive",
 ]
@@ -2123,9 +2140,9 @@
 
 [[package]]
 name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2134,9 +2151,9 @@
 
 [[package]]
 name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
 dependencies = [
  "itoa",
  "ryu",
@@ -2527,11 +2544,10 @@
 
 [[package]]
 name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
 dependencies = [
- "cfg-if",
  "pin-project-lite",
  "tracing-attributes",
  "tracing-core",
@@ -2539,9 +2555,9 @@
 
 [[package]]
 name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2550,9 +2566,9 @@
 
 [[package]]
 name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
 dependencies = [
  "once_cell",
  "valuable",
@@ -2560,9 +2576,9 @@
 
 [[package]]
 name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
 dependencies = [
  "indicatif",
  "tracing",
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
 [workspace]
 members = ["crates/*", "cmds/*"]
 resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
 edition = "2021"
 
 [dependencies]
+nixlike.workspace = true
+better-command.workspace = true
 anyhow = "1.0"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
@@ -15,7 +17,6 @@
 hostname = "0.3.1"
 age-core = "0.9.0"
 peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
 age = { version = "0.9.2", features = ["ssh", "armor"] }
 base64 = "0.21.5"
 chrono = { version = "0.4.31", features = ["serde"] }
modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
 use std::collections::HashMap;
 use std::ffi::{OsStr, OsString};
 use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
 use std::sync::{Arc, OnceLock};
 
 use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
 use futures::StreamExt;
 use itertools::Itertools;
 use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
 use tokio::io::AsyncWriteExt;
 use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
 use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
 use tokio_util::codec::{FramedRead, LinesCodec};
 use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
 
 const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
 
@@ -30,6 +32,8 @@
 	string_wrapping: (String, String),
 	number_wrapping: (String, String),
 
+	executing_command: Arc<Mutex<()>>,
+
 	next_id: u32,
 	free_list: Vec<u32>,
 }
@@ -219,6 +223,8 @@
 			string_wrapping: Default::default(),
 			number_wrapping: Default::default(),
 
+			executing_command: Arc::new(Mutex::new(())),
+
 			next_id: 0,
 			free_list: vec![],
 		};
@@ -331,6 +337,10 @@
 		expr: impl AsRef<[u8]>,
 		err_handler: &mut dyn Handler,
 	) -> Result<String> {
+		// Prevent two commands from being executed in parallel, messing with each other.
+		let _lock = self.executing_command.clone();
+		let _guard = _lock.lock().await;
+
 		self.send_command(expr).await?;
 		// It will be echoed
 		self.send_command(REPL_DELIMITER).await?;
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
 use std::{env::current_dir, time::Duration};
 
 use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
 use crate::nix_go;
 use anyhow::{anyhow, Result};
 use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
 use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
@@ -112,12 +112,12 @@
 	current: bool,
 	datetime: String,
 }
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
-	let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+	let mut cmd = host.cmd("nix-env").await?;
 	cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 		.arg("--list-generations");
 	// Sudo is required due to --list-generations acquiring lock on the profile.
-	let data = config.run_string_on(host, cmd, true).await?;
+	let data = cmd.sudo().run_string().await?;
 	let generations = data
 		.split('\n')
 		.map(|e| e.trim())
@@ -161,25 +161,12 @@
 		.map_err(|_e| anyhow!("bad list-generations output"))?
 		.ok_or_else(|| anyhow!("failed to find generation"))?;
 	Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("stop").arg(unit);
-	config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("start").arg(unit);
-	config.run_on(host, cmd, true).await
 }
 
 async fn execute_upload(
 	build: &BuildSystems,
-	config: &Config,
 	action: UploadAction,
-	host: &str,
+	host: &ConfigHost,
 	built: PathBuf,
 ) -> Result<()> {
 	let mut failed = false;
@@ -191,15 +178,15 @@
 	if !build.disable_rollback {
 		let _span = info_span!("preparing").entered();
 		info!("preparing for rollback");
-		let generation = get_current_generation(config, host).await?;
+		let generation = get_current_generation(&host).await?;
 		info!(
 			"rollback target would be {} {}",
 			generation.id, generation.datetime
 		);
 		{
-			let mut cmd = MyCommand::new("sh");
+			let mut cmd = host.cmd("sh").await?;
 			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to set rollback marker: {e}");
 				failed = true;
 			}
@@ -215,37 +202,41 @@
 		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
 		// Anyway, reboot will still help in this case.
 		if action.should_schedule_rollback_run() {
-			let mut cmd = MyCommand::new("systemd-run");
+			let mut cmd = host.cmd("systemd-run").await?;
 			cmd.comparg("--on-active", "3min")
 				.comparg("--unit", "rollback-watchdog-run")
 				.arg("systemctl")
 				.arg("start")
 				.arg("rollback-watchdog.service");
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to schedule rollback run: {e}");
 				failed = true;
 			}
 		}
 	}
+
 	if action.should_switch_profile() && !failed {
 		info!("switching generation");
-		let mut cmd = MyCommand::new("nix-env");
+		let mut cmd = host.cmd("nix-env").await?;
 		cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 			.comparg("--set", &built);
-		if let Err(e) = config.run_on(host, cmd, true).await {
+		if let Err(e) = cmd.sudo().run().await {
 			error!("failed to switch generation: {e}");
 			failed = true;
 		}
 	}
+
+	// FIXME: Connection might be disconnected after activation run
+
 	if action.should_activate() && !failed {
 		let _span = info_span!("activating").entered();
 		info!("executing activation script");
 		let mut switch_script = built.clone();
 		switch_script.push("bin");
 		switch_script.push("switch-to-configuration");
-		let mut cmd = MyCommand::new(switch_script);
+		let mut cmd = host.cmd(switch_script).await?;
 		cmd.arg(action.name());
-		if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+		if let Err(e) = cmd.sudo().run().in_current_span().await {
 			error!("failed to activate: {e}");
 			failed = true;
 		}
@@ -253,7 +244,8 @@
 	if !build.disable_rollback {
 		if failed {
 			info!("executing rollback");
-			if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+			if let Err(e) = host
+				.systemctl_start("rollback-watchdog.service")
 				.instrument(info_span!("rollback"))
 				.await
 			{
@@ -261,27 +253,29 @@
 			}
 		} else {
 			info!("trying to mark upgrade as successful");
-			let mut cmd = MyCommand::new("rm");
-			cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-			if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+			if let Err(e) = host
+				.rm_file("/etc/fleet_rollback_marker", true)
+				.in_current_span()
+				.await
+			{
 				error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
 			}
 		}
 		info!("disarming watchdog, just in case");
-		if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+		if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
 			// It is ok, if there was no reboot - then timer might not be running.
 		}
 		if action.should_schedule_rollback_run() {
-			if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+			if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
 				error!("failed to disarm rollback run: {e}");
 			}
 		}
-	} else {
-		let mut cmd = MyCommand::new("rm");
-		cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-		if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
-			// Marker might not exist, yet better try to remove it.
-		}
+	} else if let Err(_e) = host
+		.rm_file("/etc/fleet_rollback_marker", true)
+		.in_current_span()
+		.await
+	{
+		// Marker might not exist, yet better try to remove it.
 	}
 	Ok(())
 }
@@ -289,12 +283,13 @@
 impl BuildSystems {
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
+		let host = config.host(&host).await?;
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
 		let drv = nix_go!(
 			fleet_field.buildSystems(Obj {
 				localSystem: { config.local_system.clone() }
-			})[{ action.build_attr() }][{ host }]
+			})[{ action.build_attr() }][{ &host.name }]
 		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
 
 		match action {
 			Action::Upload { action } => {
-				if !config.is_local(&host) {
+				if !config.is_local(&host.name) {
 					info!("uploading system closure");
 					{
+						// TODO: Move to remote_derivation method.
 						// Alternatively, nix store make-content-addressed can be used,
 						// at least for the first deployment, to provide trusted store key.
 						//
@@ -329,13 +325,11 @@
 					}
 					let mut tries = 0;
 					loop {
-						let mut nix = MyCommand::new("nix");
-						nix.arg("copy")
-							.arg("--substitute-on-destination")
-							.comparg("--to", format!("ssh-ng://{host}"))
-							.arg(out_output);
-						match nix.run_nix().await {
-							Ok(()) => break,
+						match host.remote_derivation(out_output).await {
+							Ok(remote) => {
+								assert!(&remote == out_output, "CA derivations aren't implemented");
+								break;
+							}
 							Err(e) if tries < 3 => {
 								tries += 1;
 								warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
 					}
 				}
 				if let Some(action) = action {
-					execute_upload(&self, &config, action, &host, out_output.clone()).await?
+					execute_upload(&self, action, &host, out_output.clone()).await?
 				}
 			}
 			Action::Package(PackageAction::SdImage) => {
 				let mut out = current_dir()?;
-				out.push(format!("sd-image-{}", host));
+				out.push(format!("sd-image-{}", host.name));
 
 				info!("linking sd image to {:?}", out);
 				symlink(out_output, out)?;
 			}
 			Action::Package(PackageAction::InstallationCd) => {
 				let mut out = current_dir()?;
-				out.push(format!("installation-cd-{}", host));
+				out.push(format!("installation-cd-{}", host.name));
 
 				info!("linking iso image to {:?}", out);
 				symlink(out_output, out)?;
@@ -379,6 +373,17 @@
 			let this = this.clone();
 			let span = info_span!("deployment", host = field::display(&host.name));
 			let hostname = host.name;
+			// FIXME: Since the introduction of better-nix-eval,
+			// due to single repl used for builds, hosts are waiting for each other to build,
+			// instead of building concurrently.
+			//
+			// Open multiple repls?
+			//
+			// Create build batcher, which will behave similar to golangs
+			// WaitGroup, and start executing once all the build tasks are scheduled?
+			// This also allows to cleanup build output, as there will be no longer
+			// "waiting for remote machine" messages in the cases when one package is needed for
+			// multiple hosts.
 			set.spawn_local(
 				(async move {
 					match this.build_task(config, hostname).await {
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
before · cmds/fleet/src/cmds/secrets/mod.rs
1use crate::{2	better_nix_eval::Field,3	fleetdata::{FleetSecret, FleetSharedSecret, SecretData},4	host::Config,5	nix_go, nix_go_json,6};7use anyhow::{anyhow, bail, ensure, Context, Result};8use chrono::{DateTime, Utc};9use clap::Parser;10use futures::{StreamExt, TryStreamExt};11use itertools::Itertools;12use owo_colors::OwoColorize;13use std::{14	collections::HashSet,15	io::{self, Cursor, Read},16	path::PathBuf,17};18use tabled::{Table, Tabled};19use tokio::fs::read_to_string;20use tracing::{info, info_span, warn};2122#[derive(Parser)]23pub enum Secret {24	/// Force load host keys for all defined hosts25	ForceKeys,26	/// Add secret, data should be provided in stdin27	AddShared {28		/// Secret name29		name: String,30		/// Secret owners31		machines: Vec<String>,32		/// Override secret if already present33		#[clap(long)]34		force: bool,35		/// Secret public part36		#[clap(long)]37		public: Option<String>,38		/// Load public part from specified file39		#[clap(long)]40		public_file: Option<PathBuf>,4142		/// Create a notification on secret expiration43		#[clap(long)]44		expires_at: Option<DateTime<Utc>>,4546		/// Secret with this name already exists, override its value while keeping the same owners.47		#[clap(long)]48		re_add: bool,49	},50	/// Add secret, data should be provided in stdin51	Add {52		/// Secret name53		name: String,54		/// Secret owners55		machine: String,56		/// Override secret if already present57		#[clap(long)]58		force: bool,59		#[clap(long)]60		public: Option<String>,61		#[clap(long)]62		public_file: Option<PathBuf>,63	},64	/// Read secret from remote host, requires sudo on said host65	Read {66		name: String,67		machine: String,68		#[clap(long)]69		plaintext: bool,70	},71	UpdateShared {72		name: String,7374		#[clap(long)]75		machines: Option<Vec<String>>,7677		#[clap(long)]78		add_machines: Vec<String>,79		#[clap(long)]80		remove_machines: Vec<String>,8182		/// Which host should we use to decrypt83		#[clap(long)]84		prefer_identities: Vec<String>,85	},86	Regenerate {87		/// Which host should we use to decrypt, in case if reencryption is required, without88		/// regeneration89		#[clap(long)]90		prefer_identities: Vec<String>,91	},92	List {},93}9495async fn generate_shared(96	config: &Config,97	display_name: &str,98	secret: Field,99) -> Result<FleetSharedSecret> {100	Ok(if secret.has_field("generateImpure").await? {101		let config_field = &config.config_unchecked_field;102		let generate = nix_go!(secret.generateImpure);103		let owners: Vec<String> = nix_go_json!(secret.expectedOwners);104105		let on: String = nix_go_json!(generate.on);106		let call_package = nix_go!(107			config_field.buildableSystems(Obj {108				localSystem: { config.local_system.clone() }109			})[{ on }]110			.config111			.nixpkgs112			.resolvedPkgs113			.callPackage114		);115116		let host = config.host(&on).await?;117118		let generator = nix_go!(call_package(generate.generator)(Obj {}));119		let generator = generator.build().await?;120		let generator = generator121			.get("out")122			.ok_or_else(|| anyhow!("missing generateImpure out"))?;123		let generator = host.remote_derivation(generator).await?;124125		let mut recipients = String::new();126		for owner in &owners {127			let key = config.key(owner).await?;128			recipients.push_str(&format!("-r \"{key}\" "));129		}130		recipients.push_str("-e");131132		let out = host.mktemp_dir().await?;133134		let mut gen = host.cmd(generator).await?;135		gen.env("rageArgs", recipients).env("out", &out);136		gen.run().await?;137138		{139			let marker = host.read_file_text(format!("{out}/marker")).await?;140			ensure!(marker == "SUCCESS", "generation not succeeded");141		}142143		let public = host.read_file_text(format!("{out}/public")).await.ok();144		let secret = host.read_file_bin(format!("{out}/secret")).await.ok();145		if let Some(secret) = &secret {146			ensure!(147				age::Decryptor::new(Cursor::new(&secret)).is_ok(),148				"builder produced non-encrypted value as secret, this is highly insecure"149			);150		}151152		let created_at = host.read_file_value(format!("{out}/created_at")).await?;153		let expires_at = host.read_file_value(format!("{out}/expires_at")).await.ok();154155		FleetSharedSecret {156			owners,157			secret: FleetSecret {158				created_at,159				expires_at,160				public,161				secret: secret.map(SecretData),162			},163		}164	} else {165		bail!("no generator defined for {display_name}")166	})167}168169async fn parse_public(170	public: Option<String>,171	public_file: Option<PathBuf>,172) -> Result<Option<String>> {173	Ok(match (public, public_file) {174		(Some(v), None) => Some(v),175		(None, Some(v)) => Some(read_to_string(v).await?),176		(Some(_), Some(_)) => {177			bail!("only public or public_file should be set")178		}179		(None, None) => None,180	})181}182183fn parse_machines(184	initial: Vec<String>,185	machines: Option<Vec<String>>,186	mut add_machines: Vec<String>,187	mut remove_machines: Vec<String>,188) -> Result<Vec<String>> {189	if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {190		bail!("no operation");191	}192193	let initial_machines = initial.clone();194	let mut target_machines = initial;195	info!("Currently encrypted for {initial_machines:?}");196197	// ensure!(machines.is_some() || !add_machines.is_empty() || )198	if let Some(machines) = machines {199		ensure!(200			add_machines.is_empty() && remove_machines.is_empty(),201			"can't combine --machines and --add-machines/--remove-machines"202		);203		let target = initial_machines.iter().collect::<HashSet<_>>();204		let source = machines.iter().collect::<HashSet<_>>();205		for removed in target.difference(&source) {206			remove_machines.push((*removed).clone());207		}208		for added in source.difference(&target) {209			add_machines.push((*added).clone());210		}211	}212213	for machine in &remove_machines {214		let mut removed = false;215		while let Some(pos) = target_machines.iter().position(|m| m == machine) {216			target_machines.swap_remove(pos);217			removed = true;218		}219		if !removed {220			warn!("secret is not enabled for {machine}");221		}222	}223	for machine in &add_machines {224		if target_machines.iter().any(|m| m == machine) {225			warn!("secret is already added to {machine}");226		} else {227			target_machines.push(machine.to_owned());228		}229	}230	if !remove_machines.is_empty() {231		// TODO: maybe force secret regeneration?232		// Not that useful without revokation.233		warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");234	}235	Ok(target_machines)236}237impl Secret {238	pub async fn run(self, config: &Config) -> Result<()> {239		match self {240			Secret::ForceKeys => {241				for host in config.list_hosts().await? {242					if config.should_skip(&host.name) {243						continue;244					}245					config.key(&host.name).await?;246				}247			}248			Secret::AddShared {249				mut machines,250				name,251				force,252				public,253				public_file,254				expires_at,255				re_add,256			} => {257				let exists = config.has_shared(&name);258				if exists && !force && !re_add {259					bail!("secret already defined");260				}261				if re_add {262					// Fixme: use clap to limit this usage263					ensure!(!force, "--force and --readd are not compatible");264					ensure!(exists, "secret doesn't exists");265					ensure!(266						machines.is_empty(),267						"you can't use machines argument for --readd"268					);269					let shared = config.shared_secret(&name)?;270					machines = shared.owners;271				}272273				let recipients = config274					.recipients(&machines.iter().map(String::as_str).collect_vec())275					.await?;276277				let secret = {278					let mut input = vec![];279					io::stdin().read_to_end(&mut input)?;280281					if input.is_empty() {282						None283					} else {284						Some(285							SecretData::encrypt(recipients, input)286								.ok_or_else(|| anyhow!("no recipients provided"))?,287						)288					}289				};290				let public = parse_public(public, public_file).await?;291				config.replace_shared(292					name,293					FleetSharedSecret {294						owners: machines,295						secret: FleetSecret {296							created_at: Utc::now(),297							expires_at,298							secret,299							public,300						},301					},302				);303			}304			Secret::Add {305				machine,306				name,307				force,308				public,309				public_file,310			} => {311				let recipient = config.recipient(&machine).await?;312313				let secret = {314					let mut input = vec![];315					io::stdin().read_to_end(&mut input)?;316					if input.is_empty() {317						bail!("no data provided")318					}319320					Some(SecretData::encrypt(vec![recipient], input).expect("recipient provided"))321				};322323				if config.has_secret(&machine, &name) && !force {324					bail!("secret already defined");325				}326				let public = parse_public(public, public_file).await?;327328				config.insert_secret(329					&machine,330					name,331					FleetSecret {332						created_at: Utc::now(),333						expires_at: None,334						secret,335						public,336					},337				);338			}339			#[allow(clippy::await_holding_refcell_ref)]340			Secret::Read {341				name,342				machine,343				plaintext,344			} => {345				let secret = config.host_secret(&machine, &name)?;346				let Some(secret) = secret.secret else {347					bail!("no secret {name}");348				};349				let host = config.host(&machine).await?;350				let data = host.decrypt(secret).await?;351				if plaintext {352					let s = String::from_utf8(data).context("output is not utf8")?;353					print!("{s}");354				} else {355					println!("{}", z85::encode(&data));356				}357			}358			Secret::UpdateShared {359				name,360				machines,361				add_machines,362				remove_machines,363				prefer_identities,364			} => {365				let mut secret = config.shared_secret(&name)?;366				if secret.secret.secret.is_none() {367					bail!("no secret");368				}369370				let initial_machines = secret.owners.clone();371				let target_machines = parse_machines(372					initial_machines.clone(),373					machines,374					add_machines,375					remove_machines,376				)?;377378				if target_machines.is_empty() {379					info!("no machines left for secret, removing it");380					config.remove_shared(&name);381					return Ok(());382				}383384				if target_machines == initial_machines {385					warn!("secret owners are already correct");386					return Ok(());387				}388389				let identity_holder = if !prefer_identities.is_empty() {390					prefer_identities391						.iter()392						.find(|i| initial_machines.iter().any(|s| s == *i))393				} else {394					secret.owners.first()395				};396				let Some(identity_holder) = identity_holder else {397					bail!("no available holder found");398				};399				let target_recipients = futures::stream::iter(&target_machines)400					.then(|m| async { config.key(m).await })401					.collect::<Vec<_>>()402					.await;403				let target_recipients =404					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;405406				if let Some(data) = secret.secret.secret {407					let encrypted = config408						.reencrypt_on_host(identity_holder, data, target_recipients)409						.await?;410					secret.secret.secret = Some(encrypted);411				}412413				secret.owners = target_machines;414				config.replace_shared(name, secret);415			}416			Secret::Regenerate { prefer_identities } => {417				{418					let expected_shared_set = config419						.list_configured_shared()420						.await?421						.into_iter()422						.collect::<HashSet<_>>();423					let shared_set = config.list_shared().into_iter().collect::<HashSet<_>>();424					for removed in expected_shared_set.difference(&shared_set) {425						info!("generating secret: {removed}");426						let config_field = &config.config_unchecked_field;427						let config_field = nix_go!(config_field.configUnchecked);428						let secret = nix_go!(config_field.sharedSecrets[{ removed }]);429						let shared = generate_shared(config, removed, secret).await?;430						config.replace_shared(removed.to_string(), shared)431					}432				}433				let mut to_remove = Vec::new();434				for name in &config.list_shared() {435					info!("updating secret: {name}");436					let mut data = config.shared_secret(name)?;437					let config_field = &config.config_unchecked_field;438					let config_field = nix_go!(config_field.configUnchecked);439					let expected_owners: Vec<String> =440						nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);441					if expected_owners.is_empty() {442						warn!("secret was removed from fleet config: {name}, removing from data");443						to_remove.push(name.to_string());444						continue;445					}446					let set = data.owners.iter().collect::<HashSet<_>>();447					let expected_set = expected_owners.iter().collect::<HashSet<_>>();448					let should_remove = set.difference(&expected_set).next().is_some();449					if set == expected_set {450						info!("secret data is ok");451						continue;452					}453454					let secret = nix_go!(config_field.sharedSecrets[{ name }]);455					let owner_dependent: bool = nix_go_json!(secret.ownerDependent);456					let regenerate_on_remove: bool = nix_go_json!(secret.regenerateOnOwnerRemoved);457					#[allow(clippy::nonminimal_bool)]458					if !owner_dependent && !(should_remove && regenerate_on_remove) {459						warn!("reencrypting secret '{name}' for new owner set");460						// TODO: force regeneration461						if should_remove {462							warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");463						}464465						let identity_holder = if !prefer_identities.is_empty() {466							prefer_identities467								.iter()468								.find(|i| data.owners.iter().any(|s| s == *i))469						} else {470							data.owners.first()471						};472						let Some(identity_holder) = identity_holder else {473							bail!("no available holder found");474						};475476						let target_recipients = futures::stream::iter(&expected_owners)477							.then(|m| async { config.key(m).await })478							.collect::<Vec<_>>()479							.await;480						let target_recipients =481							target_recipients.into_iter().collect::<Result<Vec<_>>>()?;482483						if let Some(secret) = data.secret.secret {484							let encrypted = config485								.reencrypt_on_host(identity_holder, secret, target_recipients)486								.await?;487488							data.secret.secret = Some(encrypted);489						}490						data.owners = expected_owners;491						config.replace_shared(name.to_owned(), data);492					} else {493						let shared = generate_shared(config, name, secret).await?;494						config.replace_shared(name.to_owned(), shared)495					}496				}497				for k in to_remove {498					config.remove_shared(&k);499				}500			}501			Secret::List {} => {502				let _span = info_span!("loading secrets").entered();503				let configured = config.list_configured_shared().await?;504				#[derive(Tabled)]505				struct SecretDisplay {506					#[tabled(rename = "Name")]507					name: String,508					#[tabled(rename = "Owners")]509					owners: String,510				}511				let mut table = vec![];512				for name in configured.iter().cloned() {513					let config = config.clone();514					let expected_owners = config.shared_secret_expected_owners(&name).await?;515					let data = config.shared_secret(&name)?;516					let owners = data517						.owners518						.iter()519						.map(|o| {520							if expected_owners.contains(o) {521								o.green().to_string()522							} else {523								o.red().to_string()524							}525						})526						.collect::<Vec<_>>();527					table.push(SecretDisplay {528						owners: owners.join(", "),529						name,530					})531				}532				info!("loaded\n{}", Table::new(table).to_string())533			}534		}535		Ok(())536	}537}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
-	collections::HashMap,
-	ffi::OsStr,
-	pin,
-	process::Stdio,
-	sync::{Arc, Mutex},
-	task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
 
 use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
 use futures::StreamExt;
 use itertools::Either;
-use once_cell::sync::Lazy;
 use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
 
 fn escape_bash(input: &str, out: &mut String) {
 	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
 			return self;
 		}
 		let mut out = Self::new("env");
-		if let Some(session) = self.ssh_session {
-			out = out.ssh_session(session);
-		}
+		out.ssh_session = self.ssh_session;
 		for (k, v) in self.env {
 			assert!(!k.contains('='));
 			out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
 			out
 		} else {
 			let mut out = Self::new("sudo");
+			out.ssh_session = self.ssh_session.take();
 			out.args(self.into_args());
 			out
 		}
 	}
-	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
-		self.ssh_session = Some(on);
-		self
-	}
-	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
-		let mut out = Self::new("ssh");
-		out.ssh_session = self.ssh_session.take();
-		out.arg(on).arg("--");
-		out.arg(self.into_string());
-		out
-	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -276,259 +256,6 @@
 	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
 	assert!(v.is_none());
 	Ok(())
-}
-
-pub trait Handler: Send {
-	fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
-	fn clone(&self) -> Self {
-		Self(self.0.clone())
-	}
-}
-impl<H> ClonableHandler<H> {
-	pub fn new(inner: H) -> Self {
-		Self(Arc::new(Mutex::new(inner)))
-	}
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
-	fn handle_line(&mut self, e: &str) {
-		self.0.lock().unwrap().handle_line(e)
-	}
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
-	fn handle_line(&mut self, e: &str) {
-		info!(target: "log", "{e}");
-	}
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
-	fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
-	spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
-	static OSC_CLEANER: Lazy<Regex> =
-		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
-	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
-	let m = OSC_CLEANER.replace_all(m, "");
-	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
-	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
-	DETABBER.replace_all(m.as_ref(), "  ").to_string()
-}
-impl Handler for NixHandler {
-	fn handle_line(&mut self, e: &str) {
-		if let Some(e) = e.strip_prefix("@nix ") {
-			let log: NixLog = match serde_json::from_str(e) {
-				Ok(l) => l,
-				Err(err) => {
-					warn!("failed to parse nix log line {:?}: {}", e, err);
-					return;
-				}
-			};
-			match log {
-				NixLog::Msg { msg, raw_msg, .. } => {
-					#[allow(clippy::nonminimal_bool)]
-					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
-					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
-					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
-						if let Some(raw_msg) = raw_msg {
-							if !msg.is_empty() {
-								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
-							} else {
-								info!(target: "nix", "{}", raw_msg.trim_end())
-							}
-						} else {
-							info!(target: "nix", "{}", msg.trim_end())
-						}
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 105 && !fields.is_empty() => {
-					if let [LogField::String(drv), ..] = &fields[..] {
-						let mut drv = drv.as_str();
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						info!(target: "nix","building {}", drv);
-						let span = info_span!("build", drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad build log: {:?}", log)
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 100 && fields.len() >= 3 => {
-					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
-						&fields[..]
-					{
-						let mut drv = drv.as_str();
-
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
-						let span = info_span!("copy", from, to, drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad copy log: {:?}", log)
-					}
-				}
-				NixLog::Start { text, typ, id, .. }
-					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
-				{
-					if !text.is_empty()
-						&& text != "querying info about missing paths"
-						&& text != "copying 0 paths"
-						// Too much spam on lazy-trees branch
-						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
-					{
-						let span = info_span!("job");
-						span.pb_start();
-						span.pb_set_message(&process_message(text.trim()));
-						self.spans.insert(id, span);
-						info!(target: "nix", "{}", text);
-					}
-				}
-				NixLog::Start {
-					text,
-					level: 0,
-					typ: 108,
-					..
-				} if text.is_empty() => {
-					// Cache lookup? Coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 109,
-					..
-				} if text.starts_with("querying info about ") => {
-					// Cache lookup
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 101,
-					..
-				} if text.starts_with("downloading ") => {
-					// NAR downloading, coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					..
-				} if text.starts_with("waiting for a machine to build ") => {
-					// Useless repeating notification about build
-				}
-				NixLog::Start {
-					text,
-					level: 3,
-					typ: 111,
-					..
-				} if text.starts_with("resolved derivation: ") => {
-					// CA resolved
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					id,
-					..
-				} if text.starts_with("waiting for lock on ") => {
-					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
-					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.split("', '").next() {
-						drv = txt;
-					}
-					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-						let mut it = pkg.splitn(2, '-');
-						it.next();
-						if let Some(pkg) = it.next() {
-							drv = pkg;
-						}
-					}
-					let span = info_span!("waiting on drv", drv);
-					span.pb_start();
-					self.spans.insert(id, span);
-					// Concurrent build of the same message
-				}
-				NixLog::Stop { id, .. } => {
-					self.spans.remove(&id);
-				}
-				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
-					if let Some(span) = self.spans.get(&id) {
-						if let LogField::String(s) = &fields[0] {
-							span.pb_set_message(&process_message(s.trim()));
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						warn!("unknown result id: {id} {typ} {fields:?}");
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
-					if let Some(span) = self.spans.get(&id) {
-						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
-							&fields[..4]
-						{
-							span.pb_set_length(*expected);
-							span.pb_set_position(*done);
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						// warn!("unknown result id: {id} {typ} {fields:?}");
-						// Unaccounted progress.
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
-					// Set phase, expected
-				}
-				_ => warn!("unknown log: {:?}", log),
-			};
-		} else {
-			let e = e.trim();
-			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
-				return;
-			}
-			info!("{e}")
-		}
-	}
 }
 
 async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
 ) -> Result<Option<Vec<u8>>> {
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
+	debug!("running command {cmd:?} on local");
 	let mut child = cmd.spawn()?;
 	let mut stderr = child.stderr.take().unwrap();
 	let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
 	err_handler: &mut dyn Handler,
 	mut out_handler: Option<&mut dyn Handler>,
 ) -> Result<Option<Vec<u8>>> {
+	debug!("running command {cmd:?} over ssh");
 	cmd.stderr(openssh::Stdio::piped());
 	cmd.stdout(openssh::Stdio::piped());
 	let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
 	}
 
 	Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
-	/// Return true to discard message from logging
-	fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
-	String(String),
-	Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
-	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-	where
-		D: serde::Deserializer<'de>,
-	{
-		struct StringOrNum;
-		impl<'de> Visitor<'de> for StringOrNum {
-			type Value = LogField;
-
-			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-				write!(f, "string or unsigned")
-			}
-
-			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::String(v.to_owned()))
-			}
-
-			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::Num(v))
-			}
-		}
-
-		deserializer.deserialize_any(StringOrNum)
-	}
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
-	Msg {
-		level: u32,
-		msg: String,
-		raw_msg: Option<String>,
-	},
-	Start {
-		id: u64,
-		level: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-		text: String,
-		#[serde(rename = "type")]
-		typ: u32,
-	},
-	Stop {
-		id: u64,
-	},
-	Result {
-		id: u64,
-		#[serde(rename = "type")]
-		typ: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-	},
 }
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
 	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
-use age::Recipient;
 use anyhow::{anyhow, bail, Context, Result};
 use clap::{ArgGroup, Parser};
 use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub local: bool,
 	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+	async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		assert!(!self.local, "do not open ssh connection to local session");
 		// FIXME: TOCTOU
 		if let Some(session) = &self.session.get() {
 			return Ok((*session).clone());
@@ -96,8 +97,12 @@
 		D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
 	}
 	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
-		let session = self.open_session().await?;
-		Ok(MyCommand::new_on(cmd, session))
+		if self.local {
+			Ok(MyCommand::new(cmd))
+		} else {
+			let session = self.open_session().await?;
+			Ok(MyCommand::new_on(cmd, session))
+		}
 	}
 
 	pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
 			.context("failed to call remote host for decrypt")?;
 		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
 	}
+	pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+		for target in targets {
+			cmd.eqarg("--targets", target);
+		}
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 	/// Returns path for futureproofing, as path might change i.e on conversion to CA
 	pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+		if self.local {
+			// Path is located locally, thus already trusted.
+			return Ok(path.to_owned());
+		}
 		let mut nix = MyCommand::new("nix");
 		nix.arg("copy")
 			.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
 		nix.run_nix().await?;
 		Ok(path.to_owned())
 	}
+	pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("stop").arg(name);
+		cmd.sudo().run().await
+	}
+	pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("start").arg(name);
+		cmd.sudo().run().await
+	}
+
+	pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+		let mut cmd = self.cmd("rm").await?;
+		cmd.arg("-f").arg(path);
+		if sudo {
+			cmd = cmd.sudo()
+		}
+		cmd.run().await
+	}
 }
 
 impl Config {
@@ -134,35 +175,12 @@
 	}
 	pub fn is_local(&self, host: &str) -> bool {
 		self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
-	}
-
-	pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run().await
-	}
-	pub async fn run_string_on(
-		&self,
-		host: &str,
-		mut command: MyCommand,
-		sudo: bool,
-	) -> Result<String> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run_string().await
 	}
 
 	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
 		Ok(ConfigHost {
 			name: name.to_owned(),
+			local: self.is_local(name),
 			session: OnceLock::new(),
 		})
 	}
@@ -172,6 +190,7 @@
 		let mut out = vec![];
 		for name in names {
 			out.push(ConfigHost {
+				local: self.is_local(&name),
 				name,
 				session: OnceLock::new(),
 			})
@@ -225,27 +244,6 @@
 		let mut data = self.data_mut();
 		let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
 		host_secrets.insert(secret, value);
-	}
-
-	pub async fn reencrypt_on_host(
-		&self,
-		host: &str,
-		data: SecretData,
-		targets: Vec<String>,
-	) -> Result<SecretData> {
-		let mut recmd = MyCommand::new("fleet-install-secrets");
-		recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
-		for target in targets {
-			recmd.eqarg("--targets", target);
-		}
-		recmd = recmd.sudo().ssh(host);
-		let encoded = recmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		SecretData::decode_z85(&encoded)
 	}
 
 	pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
modifiedcmds/fleet/src/keys.rsdiffbeforeafterboth
--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
 use std::str::FromStr;
 
-use crate::command::MyCommand;
 use crate::host::Config;
 use age::Recipient;
 use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
 			Ok(key)
 		} else {
 			warn!("Loading key for {}", host);
-			let mut cmd = MyCommand::new("cat");
+			let host = self.host(host).await?;
+			let mut cmd = host.cmd("cat").await?;
 			cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
-			let key = self.run_string_on(host, cmd, false).await?;
-			self.update_key(host, key.clone());
+			let key = cmd.run_string().await?;
+			self.update_key(&host.name, key.clone());
 			Ok(key)
 		}
 	}
addedcmds/remowt-agent/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
addedcmds/remowt-agent/README.adocdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
addedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+	println!("Hello, world!");
+}
addedcrates/better-command/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
addedcrates/better-command/src/handler.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+	fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+	fn clone(&self) -> Self {
+		Self(self.0.clone())
+	}
+}
+impl<H> ClonableHandler<H> {
+	pub fn new(inner: H) -> Self {
+		Self(Arc::new(Mutex::new(inner)))
+	}
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+	fn handle_line(&mut self, e: &str) {
+		self.0.lock().unwrap().handle_line(e)
+	}
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+	fn handle_line(&mut self, e: &str) {
+		info!(target: "log", "{e}");
+	}
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+	fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+	spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+	String(String),
+	Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+	Msg {
+		level: u32,
+		msg: String,
+		raw_msg: Option<String>,
+	},
+	Start {
+		id: u64,
+		level: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+		text: String,
+		#[serde(rename = "type")]
+		typ: u32,
+	},
+	Stop {
+		id: u64,
+	},
+	Result {
+		id: u64,
+		#[serde(rename = "type")]
+		typ: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+	},
+}
+fn process_message(m: &str) -> String {
+	// Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+	static OSC_CLEANER: Lazy<Regex> =
+		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+	let m = OSC_CLEANER.replace_all(m, "");
+	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+	DETABBER.replace_all(m.as_ref(), "  ").to_string()
+}
+impl Handler for NixHandler {
+	fn handle_line(&mut self, e: &str) {
+		if let Some(e) = e.strip_prefix("@nix ") {
+			let log: NixLog = match serde_json::from_str(e) {
+				Ok(l) => l,
+				Err(err) => {
+					warn!("failed to parse nix log line {:?}: {}", e, err);
+					return;
+				}
+			};
+			match log {
+				NixLog::Msg { msg, raw_msg, .. } => {
+					#[allow(clippy::nonminimal_bool)]
+					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+						if let Some(raw_msg) = raw_msg {
+							if !msg.is_empty() {
+								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+							} else {
+								info!(target: "nix", "{}", raw_msg.trim_end())
+							}
+						} else {
+							info!(target: "nix", "{}", msg.trim_end())
+						}
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 105 && !fields.is_empty() => {
+					if let [LogField::String(drv), ..] = &fields[..] {
+						let mut drv = drv.as_str();
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						info!(target: "nix","building {}", drv);
+						let span = info_span!("build", drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad build log: {:?}", log)
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 100 && fields.len() >= 3 => {
+					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+						&fields[..]
+					{
+						let mut drv = drv.as_str();
+
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
+						let span = info_span!("copy", from, to, drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad copy log: {:?}", log)
+					}
+				}
+				NixLog::Start { text, typ, id, .. }
+					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+				{
+					if !text.is_empty()
+						&& text != "querying info about missing paths"
+						&& text != "copying 0 paths"
+						// Too much spam on lazy-trees branch
+						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
+					{
+						let span = info_span!("job");
+						span.pb_start();
+						span.pb_set_message(&process_message(text.trim()));
+						self.spans.insert(id, span);
+						info!(target: "nix", "{}", text);
+					}
+				}
+				NixLog::Start {
+					text,
+					level: 0,
+					typ: 108,
+					..
+				} if text.is_empty() => {
+					// Cache lookup? Coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 109,
+					..
+				} if text.starts_with("querying info about ") => {
+					// Cache lookup
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 101,
+					..
+				} if text.starts_with("downloading ") => {
+					// NAR downloading, coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					..
+				} if text.starts_with("waiting for a machine to build ") => {
+					// Useless repeating notification about build
+				}
+				NixLog::Start {
+					text,
+					level: 3,
+					typ: 111,
+					..
+				} if text.starts_with("resolved derivation: ") => {
+					// CA resolved
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					id,
+					..
+				} if text.starts_with("waiting for lock on ") => {
+					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.split("', '").next() {
+						drv = txt;
+					}
+					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+						let mut it = pkg.splitn(2, '-');
+						it.next();
+						if let Some(pkg) = it.next() {
+							drv = pkg;
+						}
+					}
+					let span = info_span!("waiting on drv", drv);
+					span.pb_start();
+					self.spans.insert(id, span);
+					// Concurrent build of the same message
+				}
+				NixLog::Stop { id, .. } => {
+					self.spans.remove(&id);
+				}
+				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+					if let Some(span) = self.spans.get(&id) {
+						if let LogField::String(s) = &fields[0] {
+							span.pb_set_message(&process_message(s.trim()));
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						warn!("unknown result id: {id} {typ} {fields:?}");
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+					if let Some(span) = self.spans.get(&id) {
+						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+							&fields[..4]
+						{
+							span.pb_set_length(*expected);
+							span.pb_set_position(*done);
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						// warn!("unknown result id: {id} {typ} {fields:?}");
+						// Unaccounted progress.
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+					// Set phase, expected
+				}
+				_ => warn!("unknown log: {:?}", log),
+			};
+		} else {
+			let e = e.trim();
+			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+				return;
+			}
+			info!("{e}")
+		}
+	}
+}
addedcrates/better-command/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+	left + right
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn it_works() {
+		let result = add(2, 2);
+		assert_eq!(result, 4);
+	}
+}