git.delta.rocks / jrsonnet / refs/commits / a369041a95eb

difftreelog

refactor shell abstraction

Yaroslav Bolyukin2023-12-28parent: #7e2e5c5.patch.diff
in: trunk

6 files changed

modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -472,7 +472,7 @@
 	($field:ident $($tt:tt)*) => {{
 		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};
 		#[allow(unused_mut, reason = "might be used if indexed")]
-		let mut out = NixExprBuilder::field($field);
+		let mut out = NixExprBuilder::field($field.clone());
 		nix_expr_inner!(@field(out) $($tt)*);
 		out
 	}};
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -291,9 +291,11 @@
 		info!("building");
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
-		let drv = nix_go!(fleet_field.buildSystems(Obj {
-			localSystem: { config.local_system.clone() }
-		}));
+		let drv = nix_go!(
+			fleet_field.buildSystems(Obj {
+				localSystem: { config.local_system.clone() }
+			})[{ action.build_attr() }][{ host }]
+		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
 				info!("sd-image build failed");
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
before · cmds/fleet/src/cmds/secrets/mod.rs
1use crate::{2	fleetdata::{FleetSecret, FleetSharedSecret},3	host::Config,4	nix_go, nix_go_json,5};6use anyhow::{anyhow, bail, ensure, Context, Result};7use chrono::{DateTime, Utc};8use clap::Parser;9use futures::{StreamExt, TryStreamExt};10use owo_colors::OwoColorize;11use std::{12	collections::HashSet,13	io::{self, Cursor, Read},14	path::PathBuf,15};16use tabled::{Table, Tabled};17use tokio::fs::read_to_string;18use tracing::{error, info, info_span, warn};1920#[derive(Parser)]21pub enum Secret {22	/// Force load host keys for all defined hosts23	ForceKeys,24	/// Add secret, data should be provided in stdin25	AddShared {26		/// Secret name27		name: String,28		/// Secret owners29		machines: Vec<String>,30		/// Override secret if already present31		#[clap(long)]32		force: bool,33		/// Secret public part34		#[clap(long)]35		public: Option<String>,36		/// Load public part from specified file37		#[clap(long)]38		public_file: Option<PathBuf>,3940		/// Create a notification on secret expiration41		#[clap(long)]42		expires_at: Option<DateTime<Utc>>,4344		/// Secret with this name already exists, override its value while keeping the same owners.45		#[clap(long)]46		re_add: bool,47	},48	/// Add secret, data should be provided in stdin49	Add {50		/// Secret name51		name: String,52		/// Secret owners53		machine: String,54		/// Override secret if already present55		#[clap(long)]56		force: bool,57		#[clap(long)]58		public: Option<String>,59		#[clap(long)]60		public_file: Option<PathBuf>,61	},62	/// Read secret from remote host, requires sudo on said host63	Read {64		name: String,65		machine: String,66		#[clap(long)]67		plaintext: bool,68	},69	UpdateShared {70		name: String,7172		#[clap(long)]73		machines: Option<Vec<String>>,7475		#[clap(long)]76		add_machines: Vec<String>,77		#[clap(long)]78		remove_machines: Vec<String>,7980		/// Which host should we use to decrypt81		#[clap(long)]82		prefer_identities: Vec<String>,83	},84	Regenerate {85		/// Which host should we use to decrypt, in case if reencryption is required, without86		/// regeneration87		#[clap(long)]88		prefer_identities: Vec<String>,89	},90	List {},91	InvokeGenerator,92}9394impl Secret {95	pub async fn run(self, config: &Config) -> Result<()> {96		match self {97			Secret::InvokeGenerator => {98				let config_field = &config.config_unchecked_field;99100				let generate_impure =101					nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);102				let on = nix_go!(generate_impure.on);103				let call_package = nix_go!(104					config_field.buildableSystems(Obj {105						localSystem: { config.local_system.clone() }106					})[on]107						.config108						.nixpkgs109						.pkgs110						.callPackage111				);112				let generator = nix_go!(call_package(generate_impure.generator));113				let built = generator.build().await?;114				// .as_json().await?;115				dbg!(&built);116			}117			Secret::ForceKeys => {118				for host in config.list_hosts().await? {119					if config.should_skip(&host.name) {120						continue;121					}122					config.key(&host.name).await?;123				}124			}125			Secret::AddShared {126				mut machines,127				name,128				force,129				public,130				public_file,131				expires_at,132				re_add,133			} => {134				let exists = config.has_shared(&name);135				if exists && !force && !re_add {136					bail!("secret already defined");137				}138				if re_add {139					// Fixme: use clap to limit this usage140					ensure!(!force, "--force and --readd are not compatible");141					ensure!(exists, "secret doesn't exists");142					ensure!(143						machines.is_empty(),144						"you can't use machines argument for --readd"145					);146					let shared = config.shared_secret(&name)?;147					machines = shared.owners;148				}149150				let recipients = futures::stream::iter(machines.iter())151					.then(|m| config.recipient(m))152					.try_collect::<Vec<_>>()153					.await?;154155				let secret = {156					let mut input = vec![];157					io::stdin().read_to_end(&mut input)?;158159					if input.is_empty() {160						input161					} else {162						let mut encrypted = vec![];163						let recipients = recipients164							.iter()165							.cloned()166							.map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)167							.collect();168						let mut encryptor = age::Encryptor::with_recipients(recipients)169							.ok_or_else(|| anyhow!("no recipients provided"))?170							.wrap_output(&mut encrypted)?;171						io::copy(&mut Cursor::new(input), &mut encryptor)?;172						encryptor.finish()?;173						encrypted174					}175				};176				config.replace_shared(177					name,178					FleetSharedSecret {179						owners: machines,180						secret: FleetSecret {181							created_at: Utc::now(),182							expires_at,183							secret,184							public: match (public, public_file) {185								(Some(v), None) => Some(v),186								(None, Some(v)) => Some(read_to_string(v).await?),187								(Some(_), Some(_)) => {188									bail!("only public or public_file should be set")189								}190								(None, None) => None,191							},192						},193					},194				);195			}196			Secret::Add {197				machine,198				name,199				force,200				public,201				public_file,202			} => {203				let recipient = config.recipient(&machine).await?;204205				let secret = {206					let mut input = vec![];207					io::stdin().read_to_end(&mut input)?;208					if input.is_empty() {209						bail!("no data provided")210					}211212					let mut encrypted = vec![];213					let recipient = Box::new(recipient) as Box<dyn age::Recipient + Send>;214					let mut encryptor = age::Encryptor::with_recipients(vec![recipient])215						.expect("recipients provided")216						.wrap_output(&mut encrypted)?;217					io::copy(&mut Cursor::new(input), &mut encryptor)?;218					encryptor.finish()?;219					encrypted220				};221222				if config.has_secret(&machine, &name) && !force {223					bail!("secret already defined");224				}225				config.insert_secret(226					&machine,227					name,228					FleetSecret {229						created_at: Utc::now(),230						expires_at: None,231						secret,232						public: match (public, public_file) {233							(Some(v), None) => Some(v),234							(None, Some(v)) => Some(std::fs::read_to_string(v)?),235							(Some(_), Some(_)) => bail!("only public or public_file should be set"),236							(None, None) => None,237						},238					},239				);240			}241			// TODO: Instead of using sudo, decode secret on remote machine242			#[allow(clippy::await_holding_refcell_ref)]243			Secret::Read {244				name,245				machine,246				plaintext,247			} => {248				let secret = config.host_secret(&machine, &name)?;249				if secret.secret.is_empty() {250					bail!("no secret {name}");251				}252				let data = config.decrypt_on_host(&machine, secret.secret).await?;253				if plaintext {254					let s = String::from_utf8(data).context("output is not utf8")?;255					print!("{s}");256				} else {257					println!("{}", z85::encode(&data));258				}259			}260			Secret::UpdateShared {261				name,262				machines,263				mut add_machines,264				mut remove_machines,265				prefer_identities,266			} => {267				if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {268					bail!("no operation");269				}270271				let mut secret = config.shared_secret(&name)?;272				if secret.secret.secret.is_empty() {273					bail!("no secret");274				}275276				let initial_machines = secret.owners.clone();277				let mut target_machines = secret.owners.clone();278				info!("Currently encrypted for {initial_machines:?}");279280				// ensure!(machines.is_some() || !add_machines.is_empty() || )281				if let Some(machines) = machines {282					ensure!(283						add_machines.is_empty() && remove_machines.is_empty(),284						"can't combine --machines and --add-machines/--remove-machines"285					);286					let target = initial_machines.iter().collect::<HashSet<_>>();287					let source = machines.iter().collect::<HashSet<_>>();288					for removed in target.difference(&source) {289						remove_machines.push((*removed).clone());290					}291					for added in source.difference(&target) {292						add_machines.push((*added).clone());293					}294				}295296				for machine in &remove_machines {297					let mut removed = false;298					while let Some(pos) = target_machines.iter().position(|m| m == machine) {299						target_machines.swap_remove(pos);300						removed = true;301					}302					if !removed {303						warn!("secret is not enabled for {machine}");304					}305				}306				for machine in &add_machines {307					if target_machines.iter().any(|m| m == machine) {308						warn!("secret is already added to {machine}");309					} else {310						target_machines.push(machine.to_owned());311					}312				}313				if !remove_machines.is_empty() {314					warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");315				}316317				if target_machines.is_empty() {318					info!("no machines left for secret, removing it");319					config.remove_shared(&name);320					return Ok(());321				}322323				if target_machines == initial_machines {324					warn!("secret owners are already correct");325					return Ok(());326				}327328				let identity_holder = if !prefer_identities.is_empty() {329					prefer_identities330						.iter()331						.find(|i| initial_machines.iter().any(|s| s == *i))332				} else {333					secret.owners.first()334				};335				let Some(identity_holder) = identity_holder else {336					bail!("no available holder found");337				};338				let target_recipients = futures::stream::iter(&target_machines)339					.then(|m| async { config.key(m).await })340					.collect::<Vec<_>>()341					.await;342				let target_recipients =343					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;344345				let encrypted = config346					.reencrypt_on_host(identity_holder, secret.secret.secret, target_recipients)347					.await?;348349				secret.owners = target_machines;350				secret.secret.secret = encrypted;351				config.replace_shared(name, secret);352			}353			Secret::Regenerate { prefer_identities } => {354				{355					let expected_shared_set = config356						.list_configured_shared()357						.await?358						.into_iter()359						.collect::<HashSet<_>>();360					let shared_set = config.list_shared().into_iter().collect::<HashSet<_>>();361					for removed in expected_shared_set.difference(&shared_set) {362						error!("secret needs to be generated: {removed}")363					}364				}365				let mut to_remove = Vec::new();366				for name in &config.list_shared() {367					info!("updating secret: {name}");368					let mut data = config.shared_secret(name)?;369					let config_field = &config.config_field;370					let expected_owners: Vec<String> =371						nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);372					if expected_owners.is_empty() {373						warn!("secret was removed from fleet config: {name}, removing from data");374						to_remove.push(name.to_string());375						continue;376					}377					let set = data.owners.iter().collect::<HashSet<_>>();378					let expected_set = expected_owners.iter().collect::<HashSet<_>>();379					let should_remove = set.difference(&expected_set).next().is_some();380					if set != expected_set {381						let owner_dependent: bool =382							nix_go_json!(config_field.sharedSecrets[{ name }].ownerDependent);383						if !owner_dependent {384							warn!("reencrypting secret '{name}' for new owner set");385							// TODO: force regeneration386							if should_remove {387								warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");388							}389390							let identity_holder = if !prefer_identities.is_empty() {391								prefer_identities392									.iter()393									.find(|i| data.owners.iter().any(|s| s == *i))394							} else {395								data.owners.first()396							};397							let Some(identity_holder) = identity_holder else {398								bail!("no available holder found");399							};400401							let target_recipients = futures::stream::iter(&expected_owners)402								.then(|m| async { config.key(m).await })403								.collect::<Vec<_>>()404								.await;405							let target_recipients =406								target_recipients.into_iter().collect::<Result<Vec<_>>>()?;407408							let encrypted = config409								.reencrypt_on_host(410									identity_holder,411									data.secret.secret,412									target_recipients,413								)414								.await?;415416							data.secret.secret = encrypted;417							data.owners = expected_owners;418							config.replace_shared(name.to_owned(), data);419						} else {420							error!("secret '{name}' should be regenerated manually");421						}422					} else {423						info!("secret data is ok")424					}425				}426				for k in to_remove {427					config.remove_shared(&k);428				}429			}430			Secret::List {} => {431				let _span = info_span!("loading secrets").entered();432				let configured = config.list_configured_shared().await?;433				#[derive(Tabled)]434				struct SecretDisplay {435					#[tabled(rename = "Name")]436					name: String,437					#[tabled(rename = "Owners")]438					owners: String,439				}440				let mut table = vec![];441				for name in configured.iter().cloned() {442					let config = config.clone();443					let expected_owners = config.shared_secret_expected_owners(&name).await?;444					let data = config.shared_secret(&name)?;445					let owners = data446						.owners447						.iter()448						.map(|o| {449							if expected_owners.contains(o) {450								o.green().to_string()451							} else {452								o.red().to_string()453							}454						})455						.collect::<Vec<_>>();456					table.push(SecretDisplay {457						owners: owners.join(", "),458						name,459					})460				}461				info!("loaded\n{}", Table::new(table).to_string())462			}463		}464		Ok(())465	}466}
after · cmds/fleet/src/cmds/secrets/mod.rs
1use crate::{2	command::MyCommand,3	fleetdata::{FleetSecret, FleetSharedSecret},4	host::Config,5	nix_go, nix_go_json,6};7use anyhow::{anyhow, bail, ensure, Context, Result};8use chrono::{DateTime, Utc};9use clap::Parser;10use futures::{StreamExt, TryStreamExt};11use owo_colors::OwoColorize;12use std::{13	collections::HashSet,14	io::{self, Cursor, Read},15	path::PathBuf,16	sync::Arc,17};18use tabled::{Table, Tabled};19use tokio::fs::read_to_string;20use tracing::{error, info, info_span, warn};2122#[derive(Parser)]23pub enum Secret {24	/// Force load host keys for all defined hosts25	ForceKeys,26	/// Add secret, data should be provided in stdin27	AddShared {28		/// Secret name29		name: String,30		/// Secret owners31		machines: Vec<String>,32		/// Override secret if already present33		#[clap(long)]34		force: bool,35		/// Secret public part36		#[clap(long)]37		public: Option<String>,38		/// Load public part from specified file39		#[clap(long)]40		public_file: Option<PathBuf>,4142		/// Create a notification on secret expiration43		#[clap(long)]44		expires_at: Option<DateTime<Utc>>,4546		/// Secret with this name already exists, override its value while keeping the same owners.47		#[clap(long)]48		re_add: bool,49	},50	/// Add secret, data should be provided in stdin51	Add {52		/// Secret name53		name: String,54		/// Secret owners55		machine: String,56		/// Override secret if already present57		#[clap(long)]58		force: bool,59		#[clap(long)]60		public: Option<String>,61		#[clap(long)]62		public_file: Option<PathBuf>,63	},64	/// Read secret from remote host, requires sudo on said host65	Read {66		name: String,67		machine: String,68		#[clap(long)]69		plaintext: bool,70	},71	UpdateShared {72		name: String,7374		#[clap(long)]75		machines: Option<Vec<String>>,7677		#[clap(long)]78		add_machines: Vec<String>,79		#[clap(long)]80		remove_machines: Vec<String>,8182		/// Which host should we use to decrypt83		#[clap(long)]84		prefer_identities: Vec<String>,85	},86	Regenerate {87		/// Which host should we use to decrypt, in case if reencryption is required, without88		/// regeneration89		#[clap(long)]90		prefer_identities: Vec<String>,91	},92	List {},93	InvokeGenerator,94}9596impl Secret {97	pub async fn run(self, config: &Config) -> Result<()> {98		match self {99			Secret::InvokeGenerator => {100				let config_field = &config.config_unchecked_field;101102				let secret =103					nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);104				let generate_impure = nix_go!(secret.generateImpure);105				let on = nix_go!(generate_impure.on);106				let call_package = nix_go!(107					config_field.buildableSystems(Obj {108						localSystem: { config.local_system.clone() }109					})[on]110						.config111						.nixpkgs112						.resolvedPkgs113						.callPackage114				);115				let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));116				let built = &generator.build().await?["out"];117				let mut nix = MyCommand::new("nix");118				let on: String = on.as_json().await?;119				nix.arg("copy")120					.arg("--substitute-on-destination")121					.comparg("--to", format!("ssh-ng://{on}"))122					.arg(built);123				nix.run_nix().await?;124125				let session = config.host(&on).await?;126127				let owners: Vec<String> = nix_go_json!(secret.expectedOwners);128				dbg!(&owners);129130				let mut recipients = String::new();131				for owner in owners {132					let key = config.key(&owner).await?;133					recipients.push_str(&format!("-r \"{key}\" "));134				}135				recipients.push_str("-e");136137				// FIXME: security: created directory might be accessible to other users138				// This shouldn't be much of a concern, as data is encrypted right after creation, yet139				// still better to have.140				let tempdir = session.mktemp_dir().await?;141142				let mut gen = session.cmd(built).await?;143				gen.env("rageArgs", recipients).env("out", &tempdir);144				gen.run().await?;145146				{147					let marker = session.read_file_text(format!("{tempdir}/marker")).await?;148					ensure!(marker == "SUCCESS", "generation not succeeded");149				}150151				let public = session152					.read_file_bin(format!("{tempdir}/public"))153					.await154					.ok();155				let secret = session156					.read_file_bin(format!("{tempdir}/secret"))157					.await158					.ok();159				if let Some(secret) = &secret {160					ensure!(161						age::Decryptor::new(Cursor::new(&secret)).is_ok(),162						"builder produced non-encrypted value as secret, this is highly insecure"163					);164				}165				dbg!(&secret);166				// // .as_json().await?;167				// dbg!(&built);168			}169			Secret::ForceKeys => {170				for host in config.list_hosts().await? {171					if config.should_skip(&host.name) {172						continue;173					}174					config.key(&host.name).await?;175				}176			}177			Secret::AddShared {178				mut machines,179				name,180				force,181				public,182				public_file,183				expires_at,184				re_add,185			} => {186				let exists = config.has_shared(&name);187				if exists && !force && !re_add {188					bail!("secret already defined");189				}190				if re_add {191					// Fixme: use clap to limit this usage192					ensure!(!force, "--force and --readd are not compatible");193					ensure!(exists, "secret doesn't exists");194					ensure!(195						machines.is_empty(),196						"you can't use machines argument for --readd"197					);198					let shared = config.shared_secret(&name)?;199					machines = shared.owners;200				}201202				let recipients = futures::stream::iter(machines.iter())203					.then(|m| config.recipient(m))204					.try_collect::<Vec<_>>()205					.await?;206207				let secret = {208					let mut input = vec![];209					io::stdin().read_to_end(&mut input)?;210211					if input.is_empty() {212						input213					} else {214						let mut encrypted = vec![];215						let recipients = recipients216							.iter()217							.cloned()218							.map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)219							.collect();220						let mut encryptor = age::Encryptor::with_recipients(recipients)221							.ok_or_else(|| anyhow!("no recipients provided"))?222							.wrap_output(&mut encrypted)?;223						io::copy(&mut Cursor::new(input), &mut encryptor)?;224						encryptor.finish()?;225						encrypted226					}227				};228				config.replace_shared(229					name,230					FleetSharedSecret {231						owners: machines,232						secret: FleetSecret {233							created_at: Utc::now(),234							expires_at,235							secret,236							public: match (public, public_file) {237								(Some(v), None) => Some(v),238								(None, Some(v)) => Some(read_to_string(v).await?),239								(Some(_), Some(_)) => {240									bail!("only public or public_file should be set")241								}242								(None, None) => None,243							},244						},245					},246				);247			}248			Secret::Add {249				machine,250				name,251				force,252				public,253				public_file,254			} => {255				let recipient = config.recipient(&machine).await?;256257				let secret = {258					let mut input = vec![];259					io::stdin().read_to_end(&mut input)?;260					if input.is_empty() {261						bail!("no data provided")262					}263264					let mut encrypted = vec![];265					let recipient = Box::new(recipient) as Box<dyn age::Recipient + Send>;266					let mut encryptor = age::Encryptor::with_recipients(vec![recipient])267						.expect("recipients provided")268						.wrap_output(&mut encrypted)?;269					io::copy(&mut Cursor::new(input), &mut encryptor)?;270					encryptor.finish()?;271					encrypted272				};273274				if config.has_secret(&machine, &name) && !force {275					bail!("secret already defined");276				}277				config.insert_secret(278					&machine,279					name,280					FleetSecret {281						created_at: Utc::now(),282						expires_at: None,283						secret,284						public: match (public, public_file) {285							(Some(v), None) => Some(v),286							(None, Some(v)) => Some(std::fs::read_to_string(v)?),287							(Some(_), Some(_)) => bail!("only public or public_file should be set"),288							(None, None) => None,289						},290					},291				);292			}293			// TODO: Instead of using sudo, decode secret on remote machine294			#[allow(clippy::await_holding_refcell_ref)]295			Secret::Read {296				name,297				machine,298				plaintext,299			} => {300				let secret = config.host_secret(&machine, &name)?;301				if secret.secret.is_empty() {302					bail!("no secret {name}");303				}304				let host = config.host(&machine).await?;305				let data = host.decrypt(secret.secret).await?;306				if plaintext {307					let s = String::from_utf8(data).context("output is not utf8")?;308					print!("{s}");309				} else {310					println!("{}", z85::encode(&data));311				}312			}313			Secret::UpdateShared {314				name,315				machines,316				mut add_machines,317				mut remove_machines,318				prefer_identities,319			} => {320				if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {321					bail!("no operation");322				}323324				let mut secret = config.shared_secret(&name)?;325				if secret.secret.secret.is_empty() {326					bail!("no secret");327				}328329				let initial_machines = secret.owners.clone();330				let mut target_machines = secret.owners.clone();331				info!("Currently encrypted for {initial_machines:?}");332333				// ensure!(machines.is_some() || !add_machines.is_empty() || )334				if let Some(machines) = machines {335					ensure!(336						add_machines.is_empty() && remove_machines.is_empty(),337						"can't combine --machines and --add-machines/--remove-machines"338					);339					let target = initial_machines.iter().collect::<HashSet<_>>();340					let source = machines.iter().collect::<HashSet<_>>();341					for removed in target.difference(&source) {342						remove_machines.push((*removed).clone());343					}344					for added in source.difference(&target) {345						add_machines.push((*added).clone());346					}347				}348349				for machine in &remove_machines {350					let mut removed = false;351					while let Some(pos) = target_machines.iter().position(|m| m == machine) {352						target_machines.swap_remove(pos);353						removed = true;354					}355					if !removed {356						warn!("secret is not enabled for {machine}");357					}358				}359				for machine in &add_machines {360					if target_machines.iter().any(|m| m == machine) {361						warn!("secret is already added to {machine}");362					} else {363						target_machines.push(machine.to_owned());364					}365				}366				if !remove_machines.is_empty() {367					warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");368				}369370				if target_machines.is_empty() {371					info!("no machines left for secret, removing it");372					config.remove_shared(&name);373					return Ok(());374				}375376				if target_machines == initial_machines {377					warn!("secret owners are already correct");378					return Ok(());379				}380381				let identity_holder = if !prefer_identities.is_empty() {382					prefer_identities383						.iter()384						.find(|i| initial_machines.iter().any(|s| s == *i))385				} else {386					secret.owners.first()387				};388				let Some(identity_holder) = identity_holder else {389					bail!("no available holder found");390				};391				let target_recipients = futures::stream::iter(&target_machines)392					.then(|m| async { config.key(m).await })393					.collect::<Vec<_>>()394					.await;395				let target_recipients =396					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;397398				let encrypted = config399					.reencrypt_on_host(identity_holder, secret.secret.secret, target_recipients)400					.await?;401402				secret.owners = target_machines;403				secret.secret.secret = encrypted;404				config.replace_shared(name, secret);405			}406			Secret::Regenerate { prefer_identities } => {407				{408					let expected_shared_set = config409						.list_configured_shared()410						.await?411						.into_iter()412						.collect::<HashSet<_>>();413					let shared_set = config.list_shared().into_iter().collect::<HashSet<_>>();414					for removed in expected_shared_set.difference(&shared_set) {415						error!("secret needs to be generated: {removed}")416					}417				}418				let mut to_remove = Vec::new();419				for name in &config.list_shared() {420					info!("updating secret: {name}");421					let mut data = config.shared_secret(name)?;422					let config_field = &config.config_field;423					let expected_owners: Vec<String> =424						nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);425					if expected_owners.is_empty() {426						warn!("secret was removed from fleet config: {name}, removing from data");427						to_remove.push(name.to_string());428						continue;429					}430					let set = data.owners.iter().collect::<HashSet<_>>();431					let expected_set = expected_owners.iter().collect::<HashSet<_>>();432					let should_remove = set.difference(&expected_set).next().is_some();433					if set != expected_set {434						let owner_dependent: bool =435							nix_go_json!(config_field.sharedSecrets[{ name }].ownerDependent);436						if !owner_dependent {437							warn!("reencrypting secret '{name}' for new owner set");438							// TODO: force regeneration439							if should_remove {440								warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");441							}442443							let identity_holder = if !prefer_identities.is_empty() {444								prefer_identities445									.iter()446									.find(|i| data.owners.iter().any(|s| s == *i))447							} else {448								data.owners.first()449							};450							let Some(identity_holder) = identity_holder else {451								bail!("no available holder found");452							};453454							let target_recipients = futures::stream::iter(&expected_owners)455								.then(|m| async { config.key(m).await })456								.collect::<Vec<_>>()457								.await;458							let target_recipients =459								target_recipients.into_iter().collect::<Result<Vec<_>>>()?;460461							let encrypted = config462								.reencrypt_on_host(463									identity_holder,464									data.secret.secret,465									target_recipients,466								)467								.await?;468469							data.secret.secret = encrypted;470							data.owners = expected_owners;471							config.replace_shared(name.to_owned(), data);472						} else {473							error!("secret '{name}' should be regenerated manually");474						}475					} else {476						info!("secret data is ok")477					}478				}479				for k in to_remove {480					config.remove_shared(&k);481				}482			}483			Secret::List {} => {484				let _span = info_span!("loading secrets").entered();485				let configured = config.list_configured_shared().await?;486				#[derive(Tabled)]487				struct SecretDisplay {488					#[tabled(rename = "Name")]489					name: String,490					#[tabled(rename = "Owners")]491					owners: String,492				}493				let mut table = vec![];494				for name in configured.iter().cloned() {495					let config = config.clone();496					let expected_owners = config.shared_secret_expected_owners(&name).await?;497					let data = config.shared_secret(&name)?;498					let owners = data499						.owners500						.iter()501						.map(|o| {502							if expected_owners.contains(o) {503								o.green().to_string()504							} else {505								o.red().to_string()506							}507						})508						.collect::<Vec<_>>();509					table.push(SecretDisplay {510						owners: owners.join(", "),511						name,512					})513				}514				info!("loaded\n{}", Table::new(table).to_string())515			}516		}517		Ok(())518	}519}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,6 +1,7 @@
 use std::{
 	collections::HashMap,
 	ffi::OsStr,
+	pin,
 	process::Stdio,
 	sync::{Arc, Mutex},
 	task::Poll,
@@ -10,7 +11,7 @@
 use futures::StreamExt;
 use itertools::Either;
 use once_cell::sync::Lazy;
-use openssh::{OverSsh, Session};
+use openssh::{OverSsh, OwningCommand, Session};
 use regex::Regex;
 use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
@@ -44,6 +45,15 @@
 	ssh_session: Option<Arc<Session>>,
 }
 impl MyCommand {
+	pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {
+		assert!(!cmd.as_ref().is_empty());
+		Self {
+			command: ostoutf8(cmd),
+			args: vec![],
+			env: vec![],
+			ssh_session: Some(session),
+		}
+	}
 	pub fn new(cmd: impl AsRef<OsStr>) -> Self {
 		assert!(!cmd.as_ref().is_empty());
 		Self {
@@ -66,6 +76,29 @@
 		out.extend(self.args);
 		out
 	}
+
+	/// Translates environment variables into env command execution.
+	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).
+	///
+	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.
+	/// Figure out some way to transfer environment using stdio?
+	fn translate_env_into_env(self) -> Self {
+		if self.env.is_empty() {
+			return self;
+		}
+		let mut out = Self::new("env");
+		if let Some(session) = self.ssh_session {
+			out = out.ssh_session(session);
+		}
+		for (k, v) in self.env {
+			assert!(!k.contains('='));
+			out.arg(format!("{k}={v}"));
+		}
+		out.arg(self.command);
+		out.args(self.args);
+
+		out
+	}
 	fn into_string(self) -> String {
 		let mut out = String::new();
 		if !self.env.is_empty() {
@@ -98,7 +131,7 @@
 	}
 	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {
 		Ok(if let Some(session) = self.ssh_session.clone() {
-			let cmd = self.into_command();
+			let cmd = self.translate_env_into_env().into_command();
 			Either::Right(
 				cmd.over_ssh(session)
 					.map_err(|e| anyhow!("ssh error: {e}"))?,
@@ -126,6 +159,11 @@
 		self.arg(value);
 		self
 	}
+	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+		self.env
+			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));
+		self
+	}
 	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
 		for arg in args.into_iter() {
 			let arg = arg.as_ref();
@@ -133,9 +171,10 @@
 		}
 		self
 	}
-	pub fn sudo(self) -> Self {
+	pub fn sudo(mut self) -> Self {
 		if std::env::var_os("NO_SUDO").is_some() {
 			let mut out = Self::new("su");
+			out.ssh_session = self.ssh_session.take();
 			out.arg("-c").arg(self.into_string());
 			out
 		} else {
@@ -144,27 +183,38 @@
 			out
 		}
 	}
-	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {
+	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
+		self.ssh_session = Some(on);
+		self
+	}
+	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
 		let mut out = Self::new("ssh");
+		out.ssh_session = self.ssh_session.take();
 		out.arg(on).arg("--");
 		out.arg(self.into_string());
 		out
 	}
-	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {
-		self.ssh_session = Some(session);
-		self
-	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
-		let cmd = self.into_command();
-		run_nix_inner(str, cmd, &mut PlainHandler).await?;
+		let cmd = self.into_command_new()?;
+		match cmd {
+			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,
+			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,
+		};
 		Ok(())
 	}
 	pub async fn run_string(self) -> Result<String> {
+		let bytes = self.run_bytes().await?;
+		Ok(String::from_utf8(bytes)?)
+	}
+	pub async fn run_bytes(self) -> Result<Vec<u8>> {
 		let str = self.clone().into_string();
-		let cmd = self.into_command();
-		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;
+		let cmd = self.into_command_new()?;
+		let v = match cmd {
+			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,
+			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,
+		};
 		Ok(v)
 	}
 
@@ -172,7 +222,8 @@
 		let str = self.clone().into_string();
 		let mut cmd = self.into_command();
 		cmd.arg("--log-format").arg("internal-json");
-		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await
+		let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;
+		Ok(String::from_utf8(bytes)?)
 	}
 	pub async fn run_nix(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -198,7 +249,7 @@
 	str: String,
 	cmd: Command,
 	handler: &mut dyn Handler,
-) -> Result<String> {
+) -> Result<Vec<u8>> {
 	Ok(run_nix_inner_raw(str, cmd, true, handler, None)
 		.await?
 		.expect("has out"))
@@ -208,6 +259,24 @@
 	assert!(v.is_none());
 	Ok(())
 }
+async fn run_nix_inner_stdout_ssh(
+	str: String,
+	cmd: OwningCommand<Arc<Session>>,
+	handler: &mut dyn Handler,
+) -> Result<Vec<u8>> {
+	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)
+		.await?
+		.expect("has out"))
+}
+async fn run_nix_inner_ssh(
+	str: String,
+	cmd: OwningCommand<Arc<Session>>,
+	handler: &mut dyn Handler,
+) -> Result<()> {
+	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
+	assert!(v.is_none());
+	Ok(())
+}
 
 pub trait Handler: Send {
 	fn handle_line(&mut self, e: &str);
@@ -468,7 +537,7 @@
 	want_stdout: bool,
 	err_handler: &mut dyn Handler,
 	mut out_handler: Option<&mut dyn Handler>,
-) -> Result<Option<String>> {
+) -> Result<Option<Vec<u8>>> {
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
 	let mut child = cmd.spawn()?;
@@ -522,7 +591,71 @@
 		}
 	}
 
-	Ok(out_buf.map(String::from_utf8).transpose()?)
+	Ok(out_buf)
+}
+async fn run_nix_inner_raw_ssh(
+	str: String,
+	mut cmd: OwningCommand<Arc<Session>>,
+	want_stdout: bool,
+	err_handler: &mut dyn Handler,
+	mut out_handler: Option<&mut dyn Handler>,
+) -> Result<Option<Vec<u8>>> {
+	cmd.stderr(openssh::Stdio::piped());
+	cmd.stdout(openssh::Stdio::piped());
+	let mut child = cmd.spawn().await?;
+	let mut stderr = child.stderr().take().unwrap();
+	let stdout = child.stdout().take().unwrap();
+	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
+	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
+	let mut ob = want_stdout
+		.then(|| out.take().unwrap())
+		.unwrap_or_else(|| Box::new(EmptyAsyncRead));
+	let mut ol = (!want_stdout)
+		.then(|| out.take().unwrap())
+		.unwrap_or_else(|| Box::new(EmptyAsyncRead));
+	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
+	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
+
+	// while let Some(line) = read.next().await? {}
+
+	let mut out_buf = if want_stdout { Some(vec![]) } else { None };
+
+	let mut wait_future = pin::pin!(child.wait());
+	loop {
+		select! {
+			e = err.next() => {
+				if let Some(e) = e {
+					let e = e?;
+					err_handler.handle_line(&e);
+				}
+			},
+			o = ob.next() => {
+				if let Some(o) = o {
+					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
+				}
+			},
+			o = ol.next() => {
+				if let Some(o) = o {
+					let o = o?;
+					if let Some(out) = out_handler.as_mut() {
+						out.handle_line(&o)
+					} else {
+						err_handler.handle_line(&o)
+					}
+					// out_handler.handle_info(&o);
+				}
+			},
+			code = &mut wait_future => {
+				let code = code?;
+				if !code.success() {
+					anyhow::bail!("command '{str}' failed with status {}", code);
+				}
+				break;
+			}
+		}
+	}
+
+	Ok(out_buf)
 }
 
 pub trait ErrorRecorder: Send {
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,10 +1,10 @@
 use std::{
 	env::current_dir,
-	ffi::OsString,
+	ffi::{OsStr, OsString},
 	io::Write,
 	ops::Deref,
 	path::PathBuf,
-	sync::{Arc, Mutex, MutexGuard},
+	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
 use anyhow::{anyhow, bail, Context, Result};
@@ -46,16 +46,55 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	async fn open_session(&self) -> Result<openssh::Session> {
-		let mut session = SessionBuilder::default();
+	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		// FIXME: TOCTOU
+		if let Some(session) = &self.session.get() {
+			return Ok((*session).clone());
+		};
+		let session = SessionBuilder::default();
 
-		session
+		let session = session
 			.connect(&self.name)
 			.await
-			.map_err(|e| anyhow!("ssh error: {e}"))
+			.map_err(|e| anyhow!("ssh error: {e}"))?;
+		let session = Arc::new(session);
+		self.session.set(session.clone()).expect("TOCTOU happened");
+		Ok(session)
+	}
+	pub async fn mktemp_dir(&self) -> Result<String> {
+		let mut cmd = self.cmd("mktemp").await?;
+		cmd.arg("-d");
+		let path = cmd.run_string().await?;
+		Ok(path.trim_end().to_owned())
 	}
+	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_bytes().await
+	}
+	pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_string().await
+	}
+	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
+		let session = self.open_session().await?;
+		Ok(MyCommand::new_on(cmd, session))
+	}
+
+	pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 }
 
 impl Config {
@@ -96,12 +135,21 @@
 		command.run_string().await
 	}
 
+	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
+		Ok(ConfigHost {
+			name: name.to_owned(),
+			session: OnceLock::new(),
+		})
+	}
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
 		let fleet_field = &self.fleet_field;
 		let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
 		let mut out = vec![];
 		for name in names {
-			out.push(ConfigHost { name })
+			out.push(ConfigHost {
+				name,
+				session: OnceLock::new(),
+			})
 		}
 		Ok(out)
 	}
@@ -152,19 +200,6 @@
 		host_secrets.insert(secret, value);
 	}
 
-	pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>> {
-		let data = z85::encode(&data);
-		let mut cmd = MyCommand::new("fleet-install-secrets");
-		cmd.arg("decrypt").eqarg("--secret", data);
-		cmd = cmd.sudo().ssh(host);
-		let encoded = cmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		z85::decode(encoded).context("bad encoded data? outdated host?")
-	}
 	pub async fn reencrypt_on_host(
 		&self,
 		host: &str,
modifiednixos/meta.nixdiffbeforeafterboth
--- a/nixos/meta.nix
+++ b/nixos/meta.nix
@@ -1,11 +1,18 @@
-{ lib, ... }:
-with lib;
 {
+  lib,
+  pkgs,
+  ...
+}:
+with lib; {
   options = with types; {
+    nixpkgs.resolvedPkgs = mkOption {
+      type = types.pkgs // {description = "nixpkgs.pkgs";};
+      description = "Value of pkgs";
+    };
     tags = mkOption {
       type = listOf str;
       description = "Host tags";
-      default = [ ];
+      default = [];
     };
     network = mkOption {
       type = submodule {
@@ -13,12 +20,12 @@
           internalIps = mkOption {
             type = listOf str;
             description = "Internal ips";
-            default = [ ];
+            default = [];
           };
           externalIps = mkOption {
             type = listOf str;
             description = "External ips";
-            default = [ ];
+            default = [];
           };
         };
       };
@@ -29,7 +36,8 @@
     };
   };
   config = {
-    tags = [ "all" ];
-    network = { };
+    tags = ["all"];
+    network = {};
+    nixpkgs.resolvedPkgs = pkgs;
   };
 }