git.delta.rocks / jrsonnet / refs/commits / eee08d567ee4

difftreelog

refactor remote command management

Yaroslav Bolyukin2023-06-09parent: #837e795.patch.diff
in: trunk

8 files changed

modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,9 +1,10 @@
-use std::{env::current_dir, process::Stdio, time::Duration};
+use std::{env::current_dir, time::Duration};
 
-use crate::{command::CommandExt, host::Config};
+use crate::command::MyCommand;
+use crate::host::Config;
 use anyhow::Result;
 use clap::Parser;
-use tokio::{process::Command, task::LocalSet, time::sleep};
+use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
 #[derive(Parser, Clone)]
@@ -33,6 +34,9 @@
 	}
 
 	pub(crate) fn should_switch_profile(&self) -> bool {
+		matches!(self, Self::Switch | Self::Boot)
+	}
+	pub(crate) fn should_activate(&self) -> bool {
 		matches!(self, Self::Switch | Self::Test)
 	}
 }
@@ -108,13 +112,7 @@
 			dir.path().to_owned()
 		};
 
-		let mut nix_build = if self.privileged_build {
-			let mut out = Command::new("sudo");
-			out.arg("nix");
-			out
-		} else {
-			Command::new("nix")
-		};
+		let mut nix_build = MyCommand::new("nix");
 		nix_build
 			.args([
 				"build",
@@ -122,9 +120,8 @@
 				"--json",
 				// "--show-trace",
 				"--no-link",
-				"--out-link",
 			])
-			.arg(&built)
+			.comparg("--out-link", &built)
 			.arg(
 				config.configuration_attr_name(&format!(
 					"buildSystems.{}.{host}",
@@ -133,6 +130,10 @@
 			)
 			.args(&config.nix_args);
 
+		if self.privileged_build {
+			nix_build = nix_build.sudo();
+		}
+
 		nix_build.run_nix().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
 				info!("sd-image build failed");
@@ -149,14 +150,11 @@
 					info!("uploading system closure");
 					let mut tries = 0;
 					loop {
-						match Command::new("nix")
-							.args(["copy", "--to"])
-							.arg(format!("ssh://root@{}", host))
-							.arg(&built)
-							.inherit_stdio()
-							.run_nix()
-							.await
-						{
+						let mut nix = MyCommand::new("nix");
+						nix.arg("copy")
+							.comparg("--to", format!("ssh://root@{host}"))
+							.arg(&built);
+						match nix.run_nix().await {
 							Ok(()) => break,
 							Err(e) if tries < 3 => {
 								tries += 1;
@@ -170,24 +168,20 @@
 				if let Some(action) = action {
 					if action.should_switch_profile() {
 						info!("switching generation");
-						config
-							.command_on(&host, "nix-env", true)
-							.args(["-p", "/nix/var/nix/profiles/system", "--set"])
-							.arg(&built)
-							.inherit_stdio()
-							.run()
-							.await?;
+						let mut cmd = MyCommand::new("nix-env");
+						cmd.comparg("--profile", "/nix/var/nix/profiles/system")
+							.comparg("--set", &built);
+						config.run_on(&host, cmd, true).await?;
+					}
+					if action.should_activate() {
+						info!("executing activation script");
+						let mut switch_script = built.clone();
+						switch_script.push("bin");
+						switch_script.push("switch-to-configuration");
+						let mut cmd = MyCommand::new(switch_script);
+						cmd.arg(action.name());
+						config.run_on(&host, cmd, true).await?;
 					}
-					info!("executing activation script");
-					let mut switch_script = built.clone();
-					switch_script.push("bin");
-					switch_script.push("switch-to-configuration");
-					config
-						.command_on(&host, switch_script, true)
-						.arg(action.name())
-						.stdout(Stdio::inherit())
-						.run()
-						.await?;
 				}
 			}
 			Action::Package(PackageAction::SdImage) => {
@@ -195,39 +189,30 @@
 				out.push(format!("sd-image-{}", host));
 
 				info!("building sd image to {:?}", out);
-				let mut nix_build = if self.privileged_build {
-					let mut out = Command::new("sudo");
-					out.arg("nix");
-					out
-				} else {
-					Command::new("nix")
-				};
+				let mut nix_build = MyCommand::new("nix");
 				nix_build
-					.args(["build", "--impure", "--no-link", "--out-link"])
-					.arg(&out)
+					.args(["build", "--impure", "--no-link"])
+					.comparg("--out-link", &out)
 					.arg(config.configuration_attr_name(&format!("buildSystems.sdImage.{}", host,)))
 					.args(&config.nix_args);
 				if !self.fail_fast {
 					nix_build.arg("--keep-going");
 				}
+				if self.privileged_build {
+					nix_build = nix_build.sudo();
+				}
 
-				nix_build.inherit_stdio().run_nix().await?;
+				nix_build.run_nix().await?;
 			}
 			Action::Package(PackageAction::InstallationCd) => {
 				let mut out = current_dir()?;
 				out.push(format!("installation-cd-{}", host));
 
 				info!("building sd image to {:?}", out);
-				let mut nix_build = if self.privileged_build {
-					let mut out = Command::new("sudo");
-					out.arg("nix");
-					out
-				} else {
-					Command::new("nix")
-				};
+				let mut nix_build = MyCommand::new("nix");
 				nix_build
-					.args(["build", "--impure", "--no-link", "--out-link"])
-					.arg(&out)
+					.args(["build", "--impure", "--no-link"])
+					.comparg("--out-link", &out)
 					.arg(
 						config.configuration_attr_name(&format!(
 							"buildSystems.installationCd.{}",
@@ -238,8 +223,11 @@
 				if !self.fail_fast {
 					nix_build.arg("--keep-going");
 				}
+				if self.privileged_build {
+					nix_build = nix_build.sudo();
+				}
 
-				nix_build.inherit_stdio().run_nix().await?;
+				nix_build.run_nix().await?;
 			}
 		};
 		Ok(())
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
after · cmds/fleet/src/cmds/secrets/mod.rs
1use crate::{2	fleetdata::{FleetSecret, FleetSharedSecret},3	host::Config,4};5use anyhow::{bail, ensure, Context, Result};6use clap::Parser;7use futures::{StreamExt, TryStreamExt};8use std::{9	collections::HashSet,10	io::{self, Cursor, Read},11	path::PathBuf,12};13use tokio::fs::read_to_string;14use tracing::{info, warn, error};1516#[derive(Parser)]17pub enum Secrets {18	/// Force load keys for all defined hosts19	ForceKeys,20	/// Add secret, data should be provided in stdin21	AddShared {22		/// Secret name23		name: String,24		/// Secret owners25		machines: Vec<String>,26		/// Override secret if already present27		#[clap(long)]28		force: bool,29		#[clap(long)]30		public: Option<String>,31		#[clap(long)]32		public_file: Option<PathBuf>,33	},34	/// Add secret, data should be provided in stdin35	Add {36		/// Secret name37		name: String,38		/// Secret owners39		machine: String,40		/// Override secret if already present41		#[clap(long)]42		force: bool,43		#[clap(long)]44		public: Option<String>,45		#[clap(long)]46		public_file: Option<PathBuf>,47	},48	/// Read secret from remote host, requires sudo on said host49	Read {50		name: String,51		machine: String,52		#[clap(long)]53		plaintext: bool,54	},55	UpdateShared {56		name: String,5758		#[clap(long)]59		machines: Option<Vec<String>>,6061		#[clap(long)]62		add_machines: Vec<String>,63		#[clap(long)]64		remove_machines: Vec<String>,6566		/// Which host should we use to decrypt67		#[clap(long)]68		prefer_identities: Vec<String>,69	},70	Regenerate {71		/// Which host should we use to decrypt, in case if reencryption is required, without72		/// regeneration73		#[clap(long)]74		prefer_identities: Vec<String>,75	},76}7778impl Secrets {79	pub async fn run(self, config: &Config) -> Result<()> {80		match self {81			Secrets::ForceKeys => {82				for host in config.list_hosts().await? {83					if config.should_skip(&host) {84						continue;85					}86					config.key(&host).await?;87				}88			}89			Secrets::AddShared {90				machines,91				name,92				force,93				public,94				public_file,95			} => {96				let recipients = futures::stream::iter(machines.iter())97					.then(|m| config.recipient(m))98					.try_collect::<Vec<_>>()99					.await?;100101				let secret = {102					let mut input = vec![];103					io::stdin().read_to_end(&mut input)?;104105					if input.is_empty() {106						input107					} else {108						let mut encrypted = vec![];109						let recipients = recipients110							.iter()111							.cloned()112							.map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)113							.collect();114						let mut encryptor = age::Encryptor::with_recipients(recipients)115							.expect("recipients provided")116							.wrap_output(&mut encrypted)?;117						io::copy(&mut Cursor::new(input), &mut encryptor)?;118						encryptor.finish()?;119						encrypted120					}121				};122123				if config.has_shared(&name) && !force {124					bail!("secret already defined");125				}126				config.replace_shared(127					name,128					FleetSharedSecret {129						owners: machines,130						secret: FleetSecret {131							expire_at: None,132							secret,133							public: match (public, public_file) {134								(Some(v), None) => Some(v),135								(None, Some(v)) => Some(read_to_string(v).await?),136								(Some(_), Some(_)) => {137									bail!("only public or public_file should be set")138								}139								(None, None) => None,140							},141						},142					},143				);144			}145			Secrets::Add {146				machine,147				name,148				force,149				public,150				public_file,151			} => {152				let recipient = config.recipient(&machine).await?;153154				let secret = {155					let mut input = vec![];156					io::stdin().read_to_end(&mut input)?;157					if input.is_empty() {158						bail!("no data provided")159					}160161					let mut encrypted = vec![];162					let recipient = Box::new(recipient) as Box<dyn age::Recipient + Send>;163					let mut encryptor = age::Encryptor::with_recipients(vec![recipient])164						.expect("recipients provided")165						.wrap_output(&mut encrypted)?;166					io::copy(&mut Cursor::new(input), &mut encryptor)?;167					encryptor.finish()?;168					encrypted169				};170171				if config.has_secret(&machine, &name) && !force {172					bail!("secret already defined");173				}174				config.insert_secret(175					&machine,176					name,177					FleetSecret {178						expire_at: None,179						secret,180						public: match (public, public_file) {181							(Some(v), None) => Some(v),182							(None, Some(v)) => Some(std::fs::read_to_string(v)?),183							(Some(_), Some(_)) => bail!("only public or public_file should be set"),184							(None, None) => None,185						},186					},187				);188			}189			// TODO: Instead of using sudo, decode secret on remote machine190			#[allow(clippy::await_holding_refcell_ref)]191			Secrets::Read {192				name,193				machine,194				plaintext,195			} => {196				let secret = config.host_secret(&machine, &name)?;197				if secret.secret.is_empty() {198					bail!("no secret {name}");199				}200				let data = config.decrypt_on_host(&machine, secret.secret).await?;201				if plaintext {202					let s = String::from_utf8(data).context("output is not utf8")?;203					print!("{s}");204				} else {205					println!("{}", z85::encode(&data));206				}207			}208			Secrets::UpdateShared {209				name,210				machines,211				mut add_machines,212				mut remove_machines,213				prefer_identities,214			} => {215				if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {216					bail!("no operation");217				}218219				let mut secret = config.shared_secret(&name)?;220				if secret.secret.secret.is_empty() {221					bail!("no secret");222				}223224				let initial_machines = secret.owners.clone();225				let mut target_machines = secret.owners.clone();226				info!("Currently encrypted for {initial_machines:?}");227228				// ensure!(machines.is_some() || !add_machines.is_empty() || )229				if let Some(machines) = machines {230					ensure!(231						add_machines.is_empty() && remove_machines.is_empty(),232						"can't combine --machines and --add-machines/--remove-machines"233					);234					let target = initial_machines.iter().collect::<HashSet<_>>();235					let source = machines.iter().collect::<HashSet<_>>();236					for removed in target.difference(&source) {237						remove_machines.push((*removed).clone());238					}239					for added in source.difference(&target) {240						add_machines.push((*added).clone());241					}242				}243244				for machine in &remove_machines {245					let mut removed = false;246					while let Some(pos) = target_machines.iter().position(|m| m == machine) {247						target_machines.swap_remove(pos);248						removed = true;249					}250					if !removed {251						warn!("secret is not enabled for {machine}");252					}253				}254				for machine in &add_machines {255					if target_machines.iter().any(|m| m == machine) {256						warn!("secret is already added to {machine}");257					} else {258						target_machines.push(machine.to_owned());259					}260				}261				if !remove_machines.is_empty() {262					warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");263				}264265				if target_machines.is_empty() {266					info!("no machines left for secret, removing it");267					config.remove_shared(&name);268					return Ok(());269				}270271				if target_machines == initial_machines {272					warn!("secret owners are already correct");273					return Ok(());274				}275276				let identity_holder = if !prefer_identities.is_empty() {277					prefer_identities278						.iter()279						.find(|i| initial_machines.iter().any(|s| s == *i))280				} else {281					secret.owners.first()282				};283				let Some(identity_holder) = identity_holder else {284                    bail!("no available holder found");285                };286				let target_recipients = futures::stream::iter(&target_machines)287					.then(|m| async { config.key(m).await })288					.collect::<Vec<_>>()289					.await;290				let target_recipients =291					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;292293				let encrypted = config294					.reencrypt_on_host(&identity_holder, secret.secret.secret, target_recipients)295					.await?;296297				secret.owners = target_machines;298				secret.secret.secret = encrypted;299				config.replace_shared(name, secret);300			}301			Secrets::Regenerate { prefer_identities } => {302				{303					let expected_shared_set =304						config.shared_config_attr_names("sharedSecrets").await?;305					let expected_shared_set = expected_shared_set.iter().collect::<HashSet<_>>();306					let shared_set = config.list_shared();307					let shared_set = shared_set.iter().collect::<HashSet<_>>();308					for removed in expected_shared_set.difference(&shared_set) {309						warn!("secret needs to be generated: {removed}")310					}311				}312				let mut to_remove = Vec::new();313				for name in &config.list_shared() {314					info!("updating secret: {name}");315					let mut data = config.shared_secret(name)?;316					let expected_owners: Vec<String> = config317						.shared_config_attr(&format!("sharedSecrets.\"{name}\".expectedOwners"))318						.await?;319					if expected_owners.is_empty() {320						warn!("secret was removed from fleet config: {name}, removing from data");321						to_remove.push(name.to_string());322						continue;323					}324					let set = data.owners.iter().collect::<HashSet<_>>();325					let expected_set = expected_owners.iter().collect::<HashSet<_>>();326					let should_remove = set.difference(&expected_set).next().is_some();327					if set != expected_set {328						let owner_dependent: bool = config329							.shared_config_attr(&format!("sharedSecrets.\"{name}\".ownerDependent"))330							.await?;331						if !owner_dependent {332							warn!("reencrypting secret '{name}' for new owner set");333							// TODO: force regeneration334							if should_remove {335								warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");336							}337338							let identity_holder = if !prefer_identities.is_empty() {339								prefer_identities340									.iter()341									.find(|i| data.owners.iter().any(|s| s == *i))342							} else {343								data.owners.first()344							};345							let Some(identity_holder) = identity_holder else {346								bail!("no available holder found");347							};348349							let target_recipients = futures::stream::iter(&expected_owners)350								.then(|m| async { config.key(m).await })351								.collect::<Vec<_>>()352								.await;353							let target_recipients =354								target_recipients.into_iter().collect::<Result<Vec<_>>>()?;355356							let encrypted = config357								.reencrypt_on_host(358									&identity_holder,359									data.secret.secret,360									target_recipients,361								)362								.await?;363364							data.secret.secret = encrypted;365							data.owners = expected_owners;366							config.replace_shared(name.to_owned(), data);367						} else if let Some(generator) = config368							.shared_config_attr::<Option<String>>(&format!("sharedSecrets.\"{name}\".generator"))369							.await?370						{371							todo!("regenerate secret {name} with {generator}");372						} else {373							error!("secret '{name}' should be regenerated manually");374						}375					} else {376						info!("secret data is ok")377					}378				}379				for k in to_remove {380					config.remove_shared(&k);381				}382			}383		}384		Ok(())385	}386}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,4 +1,4 @@
-use std::{ffi::OsStr, process::Stdio};
+use std::{ffi::OsStr, process::Stdio, task::Poll};
 
 use anyhow::{Context, Result};
 use async_trait::async_trait;
@@ -7,20 +7,292 @@
 	de::{DeserializeOwned, Visitor},
 	Deserialize,
 };
-use tokio::{process::Command, select};
+use tokio::{io::AsyncRead, process::Command, select};
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
 use tracing::{info, warn};
 
+fn escape_bash(input: &str, out: &mut String) {
+	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
+	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
+		out.push_str(input);
+		return;
+	}
+	out.push('\'');
+	for (i, v) in input.split('\'').enumerate() {
+		if i != 0 {
+			out.push_str("'\"'\"'");
+		}
+		out.push_str(v);
+	}
+	out.push('\'');
+}
+fn ostoutf8(os: impl AsRef<OsStr>) -> String {
+	os.as_ref().to_str().expect("non-utf8 data").to_owned()
+}
+#[derive(Clone)]
+pub struct MyCommand {
+	command: String,
+	args: Vec<String>,
+	env: Vec<(String, String)>,
+}
+impl MyCommand {
+	pub fn new(cmd: impl AsRef<OsStr>) -> Self {
+		assert!(!cmd.as_ref().is_empty());
+		Self {
+			command: ostoutf8(cmd),
+			args: vec![],
+			env: vec![],
+		}
+	}
+	fn into_args(self) -> Vec<String> {
+		let mut out = Vec::new();
+		if !self.env.is_empty() {
+			out.push("env".to_owned());
+			for (k, v) in self.env {
+				assert!(!k.contains("="));
+				out.push(format!("{k}={v}"));
+			}
+		}
+		out.push(self.command);
+		out.extend(self.args.into_iter());
+		out
+	}
+	fn into_string(self) -> String {
+		let mut out = String::new();
+		if !self.env.is_empty() {
+			out.push_str("env");
+			for (k, v) in self.env {
+				out.push(' ');
+				assert!(!k.contains("="));
+				escape_bash(&k, &mut out);
+				out.push('=');
+				escape_bash(&v, &mut out);
+			}
+		}
+		if !out.is_empty() {
+			out.push(' ');
+		}
+		escape_bash(&self.command, &mut out);
+		for arg in self.args {
+			out.push(' ');
+			escape_bash(&arg, &mut out);
+		}
+		out
+	}
+	fn into_command(self) -> Command {
+		let mut out = Command::new(self.command);
+		out.args(self.args);
+		for (k, v) in self.env {
+			out.env(k, v);
+		}
+		out
+	}
+	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {
+		let arg = arg.as_ref();
+		self.args.push(ostoutf8(arg));
+		self
+	}
+	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {
+		let arg = arg.as_ref();
+		let value = value.as_ref();
+		let arg = ostoutf8(arg);
+		let value = ostoutf8(value);
+		self.arg(format!("{arg}={value}"));
+		self
+	}
+	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {
+		self.arg(arg);
+		self.arg(value);
+		self
+	}
+	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
+		for arg in args.into_iter() {
+			let arg = arg.as_ref();
+			self.args.push(ostoutf8(arg));
+		}
+		self
+	}
+	pub fn sudo(self) -> Self {
+		let mut out = Self::new("sudo");
+		out.args(self.into_args());
+		out
+	}
+	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {
+		let mut out = Self::new("ssh");
+		out.arg(on).arg("--");
+		out.arg(self.into_string());
+		out
+	}
+
+	pub async fn run(self) -> Result<()> {
+		let str = self.clone().into_string();
+		info!("running {str}");
+		let mut cmd = self.into_command();
+		cmd.inherit_stdio();
+		let out = cmd.spawn()?.wait_with_output().await?;
+		if !out.status.success() {
+			anyhow::bail!("command '{}' failed with status {}", str, out.status);
+		}
+		Ok(())
+	}
+	pub async fn run_string(self) -> Result<String> {
+		let str = self.clone().into_string();
+		info!("running {str}");
+		let mut cmd = self.into_command();
+		cmd.inherit_stdio();
+		cmd.stdout(Stdio::piped());
+		let out = cmd.spawn()?.wait_with_output().await?;
+		if !out.status.success() {
+			anyhow::bail!("command '{}' failed with status {}", str, out.status);
+		}
+		Ok(String::from_utf8(out.stdout)?)
+	}
+	pub async fn run_nix_json<T: DeserializeOwned>(self) -> Result<T> {
+		let str = self.run_nix_string().await?;
+		serde_json::from_str(&str).with_context(|| format!("{:?}", str))
+	}
+
+	pub async fn run_nix_string(self) -> Result<String> {
+		let str = self.clone().into_string();
+		let mut cmd = self.into_command();
+		cmd.stdout(Stdio::piped());
+		run_nix_inner(str, cmd).await.map(|v| v.unwrap())
+	}
+	pub async fn run_nix(self) -> Result<()> {
+		let str = self.clone().into_string();
+		let mut cmd = self.into_command();
+		cmd.stdout(Stdio::inherit());
+		run_nix_inner(str, cmd).await.map(|v| {
+			assert!(v.is_none());
+		})
+	}
+}
+
+struct EmptyAsyncRead;
+impl AsyncRead for EmptyAsyncRead {
+	fn poll_read(
+		self: std::pin::Pin<&mut Self>,
+		_cx: &mut std::task::Context<'_>,
+		_buf: &mut tokio::io::ReadBuf<'_>,
+	) -> Poll<std::io::Result<()>> {
+		Poll::Pending
+	}
+}
+
+async fn run_nix_inner(str: String, mut cmd: Command) -> Result<Option<String>> {
+	info!("running {str}");
+	cmd.arg("--log-format").arg("internal-json");
+	cmd.stderr(Stdio::piped());
+	let mut child = cmd.spawn()?;
+	let mut stderr = child.stderr.take().unwrap();
+	let stdout = child.stdout.take();
+	let wants_stdout = stdout.is_some();
+	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
+	let mut out: Box<dyn AsyncRead + Unpin> = stdout
+		.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin>)
+		.unwrap_or_else(|| Box::new(EmptyAsyncRead));
+	let mut out = FramedRead::new(&mut out, BytesCodec::new());
+
+	// while let Some(line) = read.next().await? {}
+
+	let mut out_buf = if wants_stdout { Some(vec![]) } else { None };
+	loop {
+		select! {
+			e = err.next() => {
+				if let Some(e) = e {
+					let e = e?;
+					if let Some(e) = e.strip_prefix("@nix ") {
+
+						let log: NixLog = match serde_json::from_str(e) {
+							Ok(l) => l,
+							Err(err) => {
+								warn!("failed to parse nix log line {:?}: {}", e, err);
+								continue;
+							},
+						};
+						match log {
+							NixLog::Msg { msg, raw_msg, .. } => {
+								if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+									&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+									&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+									if let Some(raw_msg) = raw_msg {
+										info!(target: "nix", "{raw_msg}\n{msg}")
+									}else {
+										info!(target: "nix", "{msg}")
+
+									}
+								}
+							},
+							NixLog::Start { ref fields, typ, .. } if typ == 105 && !fields.is_empty() => {
+								if let [LogField::String(drv), ..] = &fields[..] {
+									info!(target: "nix","building {}", drv)
+								} else {
+									warn!("bad build log: {:?}", log)
+								}
+							},
+							NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {
+								if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {
+									info!(target: "nix","copying {} {} -> {}", drv, from, to)
+								} else {
+									warn!("bad copy log: {:?}", log)
+								}
+							},
+							NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {
+								if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {
+									info!(target: "nix", "{}", text)
+								}
+							},
+							NixLog::Start { text, level: 0, typ: 108, .. } if text.is_empty() => {
+								// Cache lookup? Coupled with copy log
+							},
+							NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {
+								// Cache lookup
+							}
+							NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {
+								// NAR downloading, coupled with copy log
+							}
+							NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {
+								// Useless repeating notification about build
+							}
+							NixLog::Start { text, level: 3, typ: 111, .. } if text.starts_with("resolved derivation:  ") => {
+								// CA resolved
+							}
+							NixLog::Stop { .. } => {},
+							NixLog::Result { .. } => {},
+							_ => warn!("unknown log: {:?}", log)
+						};
+					} else {
+						warn!(target="nix","unknown: {}", e)
+					}
+				}
+			},
+			o = out.next() => {
+				if let Some(o) = o {
+					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
+				}
+			},
+			code = child.wait() => {
+				let code = code?;
+				if !code.success() {
+					anyhow::bail!("command '{str}' failed with status {}", code);
+				}
+				break;
+			}
+		}
+	}
+
+	Ok(out_buf.map(String::from_utf8).transpose()?)
+}
+
 #[async_trait]
 pub trait CommandExt {
-	async fn run_nix(&mut self) -> Result<()>;
-	async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;
-	async fn run_nix_string(&mut self) -> Result<String>;
-	async fn run(&mut self) -> Result<()>;
-	async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;
-	async fn run_string(&mut self) -> Result<String>;
+	// async fn run_nix(&mut self) -> Result<()>;
+	// async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;
+	// async fn run_nix_string(&mut self) -> Result<String>;
+	// async fn run(&mut self) -> Result<()>;
+	// async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;
+	// async fn run_string(&mut self) -> Result<String>;
 	fn inherit_stdio(&mut self) -> &mut Self;
-	fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self;
 }
 
 #[derive(Debug)]
@@ -91,170 +363,9 @@
 
 #[async_trait]
 impl CommandExt for Command {
-	async fn run_nix(&mut self) -> Result<()> {
-		self.run_nix_string().await.map(|_| ())
-	}
-	async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T> {
-		let str = self.run_nix_string().await?;
-		serde_json::from_str(&str).with_context(|| format!("{:?}", str))
-	}
-
-	async fn run_nix_string(&mut self) -> Result<String> {
-		self.arg("--log-format").arg("internal-json");
-		self.stderr(Stdio::piped());
-		self.stdout(Stdio::piped());
-		let mut child = self.spawn()?;
-		let mut stderr = child.stderr.take().unwrap();
-		let mut stdout = child.stdout.take().unwrap();
-		let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
-		let mut out = FramedRead::new(&mut stdout, BytesCodec::new());
-
-		// while let Some(line) = read.next().await? {}
-
-		let mut out_buf = vec![];
-		loop {
-			select! {
-				e = err.next() => {
-					if let Some(e) = e {
-						let e = e?;
-						if let Some(e) = e.strip_prefix("@nix ") {
-
-							let log: NixLog = match serde_json::from_str(e) {
-								Ok(l) => l,
-								Err(err) => {
-									warn!("failed to parse nix log line {:?}: {}", e, err);
-									continue;
-								},
-							};
-							match log {
-								NixLog::Msg { msg, raw_msg, .. } => {
-									if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
-										&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
-										&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
-										if let Some(raw_msg) = raw_msg {
-											info!(target: "nix", "{raw_msg}\n{msg}")
-										}else {
-											info!(target: "nix", "{msg}")
-
-										}
-									}
-								},
-								NixLog::Start { ref fields, typ, .. } if typ == 105 && !fields.is_empty() => {
-									if let [LogField::String(drv), ..] = &fields[..] {
-										info!(target: "nix","building {}", drv)
-									} else {
-										warn!("bad build log: {:?}", log)
-									}
-								},
-								NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {
-									if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {
-										info!(target: "nix","copying {} {} -> {}", drv, from, to)
-									} else {
-										warn!("bad copy log: {:?}", log)
-									}
-								},
-								NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {
-									if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {
-										info!(target: "nix", "{}", text)
-									}
-								},
-								NixLog::Start { text, level: 0, typ: 108, .. } if text.is_empty() => {
-									// Cache lookup? Coupled with copy log
-								},
-								NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {
-									// Cache lookup
-								}
-								NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {
-									// NAR downloading, coupled with copy log
-								}
-								NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {
-									// Useless repeating notification about build
-								}
-								NixLog::Start { text, level: 3, typ: 111, .. } if text.starts_with("resolved derivation:  ") => {
-									// CA resolved
-								}
-								NixLog::Stop { .. } => {},
-								NixLog::Result { .. } => {},
-								_ => warn!("unknown log: {:?}", log)
-							};
-						} else {
-							warn!(target="nix","unknown: {}", e)
-						}
-					}
-				},
-				o = out.next() => {
-					if let Some(o) = o {
-						out_buf.extend_from_slice(&o?);
-					}
-				},
-				code = child.wait() => {
-					let code = code?;
-					if !code.success() {
-						anyhow::bail!("command ({:?}) failed with status {}", self, code);
-					}
-					break;
-				}
-			}
-		}
-
-		Ok(String::from_utf8(out_buf)?)
-	}
-
 	fn inherit_stdio(&mut self) -> &mut Self {
 		self.stderr(Stdio::inherit());
+		self.stdout(Stdio::inherit());
 		self
-	}
-
-	async fn run(&mut self) -> Result<()> {
-		self.stderr(Stdio::piped());
-		self.stdout(Stdio::piped());
-		let mut child = self.spawn()?;
-		let mut stderr = child.stderr.take().unwrap();
-		let mut stdout = child.stdout.take().unwrap();
-		let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
-		let mut out = FramedRead::new(&mut stdout, LinesCodec::new());
-		loop {
-			select! {
-				e = err.next() => {
-					if let Some(e) = e {
-						warn!("{}", e?);
-					}
-				},
-				o = out.next() => {
-					if let Some(o) = o {
-						info!("{}", o?);
-					}
-				},
-				code = child.wait() => {
-					let code = code?;
-					if !code.success() {
-						anyhow::bail!("command ({:?}) failed with status {}", self, code);
-					}
-					break;
-				}
-			}
-		}
-		Ok(())
-	}
-
-	async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T> {
-		let str = self.run_string().await?;
-		serde_json::from_str(&str).with_context(|| format!("{:?}", str))
-	}
-
-	async fn run_string(&mut self) -> Result<String> {
-		self.inherit_stdio();
-		self.stdout(Stdio::piped());
-		let out = self.spawn()?.wait_with_output().await?;
-		if !out.status.success() {
-			anyhow::bail!("command ({:?}) failed with status {}", self, out.status);
-		}
-		Ok(String::from_utf8(out.stdout)?)
-	}
-
-	fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self {
-		let mut cmd = Command::new("ssh");
-		cmd.arg(host).arg("--").arg(command);
-		cmd
 	}
 }
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,7 +1,7 @@
 use std::{
 	cell::{Ref, RefCell, RefMut},
 	env::current_dir,
-	ffi::{OsStr, OsString},
+	ffi::OsString,
 	io::Write,
 	ops::Deref,
 	path::PathBuf,
@@ -12,10 +12,9 @@
 use clap::{ArgGroup, Parser};
 use serde::de::DeserializeOwned;
 use tempfile::NamedTempFile;
-use tokio::process::Command;
 
 use crate::{
-	command::CommandExt,
+	command::MyCommand,
 	fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
 };
 
@@ -52,24 +51,24 @@
 		self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
 	}
 
-	pub fn command_on(&self, host: &str, program: impl AsRef<OsStr>, sudo: bool) -> Command {
-		if self.is_local(host) {
-			if sudo {
-				let mut cmd = Command::new("sudo");
-				cmd.arg(program);
-				cmd
-			} else {
-				Command::new(program)
-			}
-		} else {
-			let mut cmd = Command::new("ssh");
-			cmd.arg(host).arg("--");
-			if sudo {
-				cmd.arg("sudo");
-			}
-			cmd.arg(program);
-			cmd
+	pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
+		if sudo {
+			command = command.sudo();
+		}
+		if !self.is_local(host) {
+			command = command.ssh(host);
+		}
+		command.run().await
+	}
+	#[must_use]
+	pub async fn run_string_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<String> {
+		if sudo {
+			command = command.sudo();
+		}
+		if !self.is_local(host) {
+			command = command.ssh(host);
 		}
+		command.run_string().await
 	}
 
 	pub fn configuration_attr_name(&self, name: &str) -> OsString {
@@ -83,36 +82,36 @@
 	}
 
 	pub async fn list_hosts(&self) -> Result<Vec<String>> {
-		Command::new("nix")
-			.arg("eval")
+		let mut cmd = MyCommand::new("nix");
+		cmd.arg("eval")
 			.arg(self.configuration_attr_name("configuredHosts"))
 			.args(["--apply", "builtins.attrNames", "--json", "--show-trace"])
-			.args(&self.nix_args)
-			.run_nix_json()
+			.args(&self.nix_args);
+		cmd.run_nix_json()
 			.await
 	}
 	pub async fn shared_config_attr<T: DeserializeOwned>(&self, attr: &str) -> Result<T> {
-		Command::new("nix")
-			.arg("eval")
+		let mut cmd = MyCommand::new("nix");
+		cmd.arg("eval")
 			.arg(self.configuration_attr_name(&format!("configUnchecked.{}", attr)))
 			.args(["--json", "--show-trace"])
-			.args(&self.nix_args)
-			.run_nix_json()
+			.args(&self.nix_args);
+		cmd.run_nix_json()
 			.await
 	}
 	pub async fn shared_config_attr_names(&self, attr: &str) -> Result<Vec<String>> {
-		Command::new("nix")
-			.arg("eval")
+		let mut cmd = MyCommand::new("nix");
+		cmd.arg("eval")
 			.arg(self.configuration_attr_name(&format!("configUnchecked.{}", attr)))
 			.args(["--apply", "builtins.attrNames"])
 			.args(["--json", "--show-trace"])
-			.args(&self.nix_args)
-			.run_nix_json()
+			.args(&self.nix_args);
+		cmd.run_nix_json()
 			.await
 	}
 	pub async fn config_attr<T: DeserializeOwned>(&self, host: &str, attr: &str) -> Result<T> {
-		Command::new("nix")
-			.arg("eval")
+		let mut cmd = MyCommand::new("nix");
+		cmd.arg("eval")
 			.arg(
 				self.configuration_attr_name(&format!(
 					"configuredSystems.{}.config.{}",
@@ -120,8 +119,8 @@
 				)),
 			)
 			.args(["--json", "--show-trace"])
-			.args(&self.nix_args)
-			.run_nix_json()
+			.args(&self.nix_args);
+		cmd.run_nix_json()
 			.await
 	}
 
@@ -171,23 +170,20 @@
 
 	pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>>{
 		let data = z85::encode(&data);
-		let encoded = self.command_on(host, "fleet-install-secrets", true)
-			.arg("decrypt")
-			.arg("--secret")
-			.arg(data).run_string().await.context("failed to call remote host for decrypt")?.trim().to_owned();
+		let mut cmd = MyCommand::new("fleet-install-secrets");
+		cmd.arg("decrypt").eqarg("--secret", data);
+		cmd = cmd.sudo().ssh(host);
+		let encoded = cmd.run_string().await.context("failed to call remote host for decrypt")?.trim().to_owned();
 		Ok(z85::decode(encoded).context("bad encoded data? outdated host?")?)
 	}
 	pub async fn reencrypt_on_host(&self, host: &str, data: Vec<u8>, targets: Vec<String>) -> Result<Vec<u8>>{
 		let data = z85::encode(&data);
-		let mut recmd = self.command_on(host, "fleet-install-secrets", true);
-		recmd
-			.arg("reencrypt")
-			.arg("--secret")
-			.arg(format!("\"{}\"", data.replace('$', "\\$")));
+		let mut recmd = MyCommand::new("fleet-install-secrets");
+		recmd.arg("reencrypt").eqarg("--secret",data);
 		for target in targets {
-			recmd.arg("--targets");
-			recmd.arg(format!("\"{target}\""));
+			recmd.eqarg("--targets", target);
 		}
+		recmd = recmd.sudo().ssh(host);
 		let encoded = recmd.run_string().await.context("failed to call remote host for decrypt")?.trim().to_owned();
 		Ok(z85::decode(encoded).context("bad encoded data? outdated host?")?)
 	}
modifiedcmds/fleet/src/keys.rsdiffbeforeafterboth
--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,7 @@
 use std::str::FromStr;
 
-use crate::{command::CommandExt, host::Config};
+use crate::command::MyCommand;
+use crate::host::Config;
 use anyhow::{anyhow, Result};
 use tracing::warn;
 
@@ -26,11 +27,9 @@
 			Ok(key)
 		} else {
 			warn!("Loading key for {}", host);
-			let key = self
-				.command_on(host, "cat", false)
-				.arg("/etc/ssh/ssh_host_ed25519_key.pub")
-				.run_string()
-				.await?;
+			let mut cmd = MyCommand::new("cat");
+			cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
+			let key = self.run_string_on(host, cmd, false).await?;
 			self.update_key(host, key.clone());
 			Ok(key)
 		}
modifiedcmds/install-secrets/src/main.rsdiffbeforeafterboth
--- a/cmds/install-secrets/src/main.rs
+++ b/cmds/install-secrets/src/main.rs
@@ -250,7 +250,7 @@
 
 			if plaintext {
 				let s = String::from_utf8(decrypted).context("output is not utf8")?;
-				print!("{}", s);
+				print!("{s}");
 			} else {
 				println!("{}", SecretWrapper(decrypted));
 			}
modifiedmodules/fleet/secrets.nixdiffbeforeafterboth
--- a/modules/fleet/secrets.nix
+++ b/modules/fleet/secrets.nix
@@ -2,15 +2,6 @@
 let
   sharedSecret = with types; {
     options = {
-      owners = mkOption {
-        type = listOf str;
-        description = ''
-          For which owners this secret is currently encrypted,
-          if not matches expectedOwners - then this secret is considered outdated, and
-          should be regenerated/reencrypted
-        '';
-        default = [ ];
-      };
       expectedOwners = mkOption {
         type = listOf str;
         description = ''
@@ -25,12 +16,38 @@
         description = "Is this secret owner-dependent, and needs to be regenerated on ownership set change, or it may be just reencrypted";
       };
       generator = mkOption {
-        type = nullOr package;
-        description = ''
-          Derivation to execute for secret generation
+        type = nullOr (submodule {
+          packages = mkOption {
+            type = attrsOf package;
+            description = ''
+              Derivation to execute for shared secret generation (key = system).
+              This derivation should produce directory, with exactly two files:
+                - publicData
+                - encryptedSecretData
+
+              If null - secret value may only be created manually.
+            '';
+          };
+          expectedData = mkOption {
+            type = types.unspecified;
+            description = "Data expected to be used for secret generation, if doesn't match specified - secret should be regenerated";
+          };
+          dependencies = mkOption {
+            type = listOf str;
+            description = ''
+              List of secrets, on which this secret depends.
 
-          If null - may only be created manually
-        '';
+              During generation, generator command will be ran on host, which already has specified secrets generated.
+            '';
+            default = [];
+          };
+          data = mkOption {
+            type = types.unspecified;
+            description = "Data used for secret generation. Imported from fleet.nix";
+            default = null;
+            internal = true;
+          };
+        });
         default = null;
       };
       expireIn = mkOption {
@@ -38,15 +55,28 @@
         description = "Time in hours, in which this secret should be regenerated";
         default = null;
       };
+
+      owners = mkOption {
+        type = listOf str;
+        description = ''
+          For which owners this secret is currently encrypted,
+          if not matches expectedOwners - then this secret is considered outdated, and
+          should be regenerated/reencrypted.
+
+          Imported from fleet.nix
+        '';
+        default = [ ];
+      };
       public = mkOption {
         type = nullOr str;
-        description = "Secret public data";
+        description = "Secret public data. Imported from fleet.nix";
         default = null;
       };
       secret = mkOption {
         type = nullOr str;
-        description = "Encrypted secret data";
+        description = "Encrypted secret data. Imported from fleet.nix";
         default = null;
+        internal = true;
       };
     };
   };
modifiednixos/secrets.nixdiffbeforeafterboth
--- a/nixos/secrets.nix
+++ b/nixos/secrets.nix
@@ -5,14 +5,14 @@
 let
   sysConfig = config;
   secretType = types.submodule ({ config, ... }: {
-    config = rec {
-      stableSecretPath = mkOptionDefault "/run/secrets/secret-stable-${config._module.args.name}";
-      secretPath = mkOptionDefault "/run/secrets/secret-${config.secretHash}-${config._module.args.name}";
-      secretHash = mkOptionDefault (if config.secret != null then (builtins.hashString "sha1" config.secret) else "<missingno>");
+    config = let secretName = config._module.args.name; in rec {
+      stableSecretPath = mkOptionDefault "/run/secrets/secret-stable-${secretName}";
+      secretPath = mkOptionDefault "/run/secrets/secret-${config.secretHash}-${secretName}";
+      secretHash = mkOptionDefault (if config.secret != null then (builtins.hashString "sha1" config.secret) else throw "secret is not defined for secret ${secretName}");
 
-      stablePublicPath = mkOptionDefault "/run/secrets/public-stable-${config._module.args.name}";
-      publicPath = mkOptionDefault "/run/secrets/public-${config.publicHash}-${config._module.args.name}";
-      publicHash = mkOptionDefault (if config.public != null then (builtins.hashString "sha1" config.public) else "<missingno>");
+      stablePublicPath = mkOptionDefault "/run/secrets/public-stable-${secretName}";
+      publicPath = mkOptionDefault "/run/secrets/public-${config.publicHash}-${secretName}";
+      publicHash = mkOptionDefault (if config.public != null then (builtins.hashString "sha1" config.public) else throw "public is not defined for secret ${secretName}");
     };
     options = {
       public = mkOption {
@@ -77,7 +77,13 @@
   });
   secretsFile = pkgs.writeTextFile {
     name = "secrets.json";
-    text = builtins.toJSON config.secrets;
+    text = builtins.toJSON (mapAttrs (_: value: rec {
+      inherit (value) group mode owner secret public;
+      publicPath = if public != null then value.publicPath else "/missingno";
+      stablePublicPath = if public != null then value.stablePublicPath else "/missingno";
+      secretPath = if secret != null then value.secretPath else "/missingno";
+      stableSecretPath = if secret != null then value.stableSecretPath else "/missingno";
+    }) config.secrets);
   };
 in
 {