git.delta.rocks / jrsonnet / refs/commits / a369041a95eb

difftreelog

refactor shell abstraction

Yaroslav Bolyukin2023-12-28parent: #7e2e5c5.patch.diff
in: trunk

6 files changed

modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -472,7 +472,7 @@
 	($field:ident $($tt:tt)*) => {{
 		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};
 		#[allow(unused_mut, reason = "might be used if indexed")]
-		let mut out = NixExprBuilder::field($field);
+		let mut out = NixExprBuilder::field($field.clone());
 		nix_expr_inner!(@field(out) $($tt)*);
 		out
 	}};
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -291,9 +291,11 @@
 		info!("building");
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
-		let drv = nix_go!(fleet_field.buildSystems(Obj {
-			localSystem: { config.local_system.clone() }
-		}));
+		let drv = nix_go!(
+			fleet_field.buildSystems(Obj {
+				localSystem: { config.local_system.clone() }
+			})[{ action.build_attr() }][{ host }]
+		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
 				info!("sd-image build failed");
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
before · cmds/fleet/src/cmds/secrets/mod.rs
1use crate::{2	fleetdata::{FleetSecret, FleetSharedSecret},3	host::Config,4	nix_go, nix_go_json,5};6use anyhow::{anyhow, bail, ensure, Context, Result};7use chrono::{DateTime, Utc};8use clap::Parser;9use futures::{StreamExt, TryStreamExt};10use owo_colors::OwoColorize;11use std::{12	collections::HashSet,13	io::{self, Cursor, Read},14	path::PathBuf,15};16use tabled::{Table, Tabled};17use tokio::fs::read_to_string;18use tracing::{error, info, info_span, warn};1920#[derive(Parser)]21pub enum Secret {22	/// Force load host keys for all defined hosts23	ForceKeys,24	/// Add secret, data should be provided in stdin25	AddShared {26		/// Secret name27		name: String,28		/// Secret owners29		machines: Vec<String>,30		/// Override secret if already present31		#[clap(long)]32		force: bool,33		/// Secret public part34		#[clap(long)]35		public: Option<String>,36		/// Load public part from specified file37		#[clap(long)]38		public_file: Option<PathBuf>,3940		/// Create a notification on secret expiration41		#[clap(long)]42		expires_at: Option<DateTime<Utc>>,4344		/// Secret with this name already exists, override its value while keeping the same owners.45		#[clap(long)]46		re_add: bool,47	},48	/// Add secret, data should be provided in stdin49	Add {50		/// Secret name51		name: String,52		/// Secret owners53		machine: String,54		/// Override secret if already present55		#[clap(long)]56		force: bool,57		#[clap(long)]58		public: Option<String>,59		#[clap(long)]60		public_file: Option<PathBuf>,61	},62	/// Read secret from remote host, requires sudo on said host63	Read {64		name: String,65		machine: String,66		#[clap(long)]67		plaintext: bool,68	},69	UpdateShared {70		name: String,7172		#[clap(long)]73		machines: Option<Vec<String>>,7475		#[clap(long)]76		add_machines: Vec<String>,77		#[clap(long)]78		remove_machines: Vec<String>,7980		/// Which host should we use to decrypt81		#[clap(long)]82		prefer_identities: Vec<String>,83	},84	Regenerate {85		/// Which host should we use to decrypt, in case if reencryption is required, without86		/// regeneration87		#[clap(long)]88		prefer_identities: Vec<String>,89	},90	List {},91	InvokeGenerator,92}9394impl Secret {95	pub async fn run(self, config: &Config) -> Result<()> {96		match self {97			Secret::InvokeGenerator => {98				let config_field = &config.config_unchecked_field;99100				let generate_impure =101					nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);102				let on = nix_go!(generate_impure.on);103				let call_package = nix_go!(104					config_field.buildableSystems(Obj {105						localSystem: { config.local_system.clone() }106					})[on]107						.config108						.nixpkgs109						.pkgs110						.callPackage111				);112				let generator = nix_go!(call_package(generate_impure.generator));113				let built = generator.build().await?;114				// .as_json().await?;115				dbg!(&built);116			}117			Secret::ForceKeys => {118				for host in config.list_hosts().await? {119					if config.should_skip(&host.name) {120						continue;121					}122					config.key(&host.name).await?;123				}124			}125			Secret::AddShared {126				mut machines,127				name,128				force,129				public,130				public_file,131				expires_at,132				re_add,133			} => {134				let exists = config.has_shared(&name);135				if exists && !force && !re_add {136					bail!("secret already defined");137				}138				if re_add {139					// Fixme: use clap to limit this usage140					ensure!(!force, "--force and --readd are not compatible");141					ensure!(exists, "secret doesn't exists");142					ensure!(143						machines.is_empty(),144						"you can't use machines argument for --readd"145					);146					let shared = config.shared_secret(&name)?;147					machines = shared.owners;148				}149150				let recipients = futures::stream::iter(machines.iter())151					.then(|m| config.recipient(m))152					.try_collect::<Vec<_>>()153					.await?;154155				let secret = {156					let mut input = vec![];157					io::stdin().read_to_end(&mut input)?;158159					if input.is_empty() {160						input161					} else {162						let mut encrypted = vec![];163						let recipients = recipients164							.iter()165							.cloned()166							.map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)167							.collect();168						let mut encryptor = age::Encryptor::with_recipients(recipients)169							.ok_or_else(|| anyhow!("no recipients provided"))?170							.wrap_output(&mut encrypted)?;171						io::copy(&mut Cursor::new(input), &mut encryptor)?;172						encryptor.finish()?;173						encrypted174					}175				};176				config.replace_shared(177					name,178					FleetSharedSecret {179						owners: machines,180						secret: FleetSecret {181							created_at: Utc::now(),182							expires_at,183							secret,184							public: match (public, public_file) {185								(Some(v), None) => Some(v),186								(None, Some(v)) => Some(read_to_string(v).await?),187								(Some(_), Some(_)) => {188									bail!("only public or public_file should be set")189								}190								(None, None) => None,191							},192						},193					},194				);195			}196			Secret::Add {197				machine,198				name,199				force,200				public,201				public_file,202			} => {203				let recipient = config.recipient(&machine).await?;204205				let secret = {206					let mut input = vec![];207					io::stdin().read_to_end(&mut input)?;208					if input.is_empty() {209						bail!("no data provided")210					}211212					let mut encrypted = vec![];213					let recipient = Box::new(recipient) as Box<dyn age::Recipient + Send>;214					let mut encryptor = age::Encryptor::with_recipients(vec![recipient])215						.expect("recipients provided")216						.wrap_output(&mut encrypted)?;217					io::copy(&mut Cursor::new(input), &mut encryptor)?;218					encryptor.finish()?;219					encrypted220				};221222				if config.has_secret(&machine, &name) && !force {223					bail!("secret already defined");224				}225				config.insert_secret(226					&machine,227					name,228					FleetSecret {229						created_at: Utc::now(),230						expires_at: None,231						secret,232						public: match (public, public_file) {233							(Some(v), None) => Some(v),234							(None, Some(v)) => Some(std::fs::read_to_string(v)?),235							(Some(_), Some(_)) => bail!("only public or public_file should be set"),236							(None, None) => None,237						},238					},239				);240			}241			// TODO: Instead of using sudo, decode secret on remote machine242			#[allow(clippy::await_holding_refcell_ref)]243			Secret::Read {244				name,245				machine,246				plaintext,247			} => {248				let secret = config.host_secret(&machine, &name)?;249				if secret.secret.is_empty() {250					bail!("no secret {name}");251				}252				let data = config.decrypt_on_host(&machine, secret.secret).await?;253				if plaintext {254					let s = String::from_utf8(data).context("output is not utf8")?;255					print!("{s}");256				} else {257					println!("{}", z85::encode(&data));258				}259			}260			Secret::UpdateShared {261				name,262				machines,263				mut add_machines,264				mut remove_machines,265				prefer_identities,266			} => {267				if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {268					bail!("no operation");269				}270271				let mut secret = config.shared_secret(&name)?;272				if secret.secret.secret.is_empty() {273					bail!("no secret");274				}275276				let initial_machines = secret.owners.clone();277				let mut target_machines = secret.owners.clone();278				info!("Currently encrypted for {initial_machines:?}");279280				// ensure!(machines.is_some() || !add_machines.is_empty() || )281				if let Some(machines) = machines {282					ensure!(283						add_machines.is_empty() && remove_machines.is_empty(),284						"can't combine --machines and --add-machines/--remove-machines"285					);286					let target = initial_machines.iter().collect::<HashSet<_>>();287					let source = machines.iter().collect::<HashSet<_>>();288					for removed in target.difference(&source) {289						remove_machines.push((*removed).clone());290					}291					for added in source.difference(&target) {292						add_machines.push((*added).clone());293					}294				}295296				for machine in &remove_machines {297					let mut removed = false;298					while let Some(pos) = target_machines.iter().position(|m| m == machine) {299						target_machines.swap_remove(pos);300						removed = true;301					}302					if !removed {303						warn!("secret is not enabled for {machine}");304					}305				}306				for machine in &add_machines {307					if target_machines.iter().any(|m| m == machine) {308						warn!("secret is already added to {machine}");309					} else {310						target_machines.push(machine.to_owned());311					}312				}313				if !remove_machines.is_empty() {314					warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");315				}316317				if target_machines.is_empty() {318					info!("no machines left for secret, removing it");319					config.remove_shared(&name);320					return Ok(());321				}322323				if target_machines == initial_machines {324					warn!("secret owners are already correct");325					return Ok(());326				}327328				let identity_holder = if !prefer_identities.is_empty() {329					prefer_identities330						.iter()331						.find(|i| initial_machines.iter().any(|s| s == *i))332				} else {333					secret.owners.first()334				};335				let Some(identity_holder) = identity_holder else {336					bail!("no available holder found");337				};338				let target_recipients = futures::stream::iter(&target_machines)339					.then(|m| async { config.key(m).await })340					.collect::<Vec<_>>()341					.await;342				let target_recipients =343					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;344345				let encrypted = config346					.reencrypt_on_host(identity_holder, secret.secret.secret, target_recipients)347					.await?;348349				secret.owners = target_machines;350				secret.secret.secret = encrypted;351				config.replace_shared(name, secret);352			}353			Secret::Regenerate { prefer_identities } => {354				{355					let expected_shared_set = config356						.list_configured_shared()357						.await?358						.into_iter()359						.collect::<HashSet<_>>();360					let shared_set = config.list_shared().into_iter().collect::<HashSet<_>>();361					for removed in expected_shared_set.difference(&shared_set) {362						error!("secret needs to be generated: {removed}")363					}364				}365				let mut to_remove = Vec::new();366				for name in &config.list_shared() {367					info!("updating secret: {name}");368					let mut data = config.shared_secret(name)?;369					let config_field = &config.config_field;370					let expected_owners: Vec<String> =371						nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);372					if expected_owners.is_empty() {373						warn!("secret was removed from fleet config: {name}, removing from data");374						to_remove.push(name.to_string());375						continue;376					}377					let set = data.owners.iter().collect::<HashSet<_>>();378					let expected_set = expected_owners.iter().collect::<HashSet<_>>();379					let should_remove = set.difference(&expected_set).next().is_some();380					if set != expected_set {381						let owner_dependent: bool =382							nix_go_json!(config_field.sharedSecrets[{ name }].ownerDependent);383						if !owner_dependent {384							warn!("reencrypting secret '{name}' for new owner set");385							// TODO: force regeneration386							if should_remove {387								warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");388							}389390							let identity_holder = if !prefer_identities.is_empty() {391								prefer_identities392									.iter()393									.find(|i| data.owners.iter().any(|s| s == *i))394							} else {395								data.owners.first()396							};397							let Some(identity_holder) = identity_holder else {398								bail!("no available holder found");399							};400401							let target_recipients = futures::stream::iter(&expected_owners)402								.then(|m| async { config.key(m).await })403								.collect::<Vec<_>>()404								.await;405							let target_recipients =406								target_recipients.into_iter().collect::<Result<Vec<_>>>()?;407408							let encrypted = config409								.reencrypt_on_host(410									identity_holder,411									data.secret.secret,412									target_recipients,413								)414								.await?;415416							data.secret.secret = encrypted;417							data.owners = expected_owners;418							config.replace_shared(name.to_owned(), data);419						} else {420							error!("secret '{name}' should be regenerated manually");421						}422					} else {423						info!("secret data is ok")424					}425				}426				for k in to_remove {427					config.remove_shared(&k);428				}429			}430			Secret::List {} => {431				let _span = info_span!("loading secrets").entered();432				let configured = config.list_configured_shared().await?;433				#[derive(Tabled)]434				struct SecretDisplay {435					#[tabled(rename = "Name")]436					name: String,437					#[tabled(rename = "Owners")]438					owners: String,439				}440				let mut table = vec![];441				for name in configured.iter().cloned() {442					let config = config.clone();443					let expected_owners = config.shared_secret_expected_owners(&name).await?;444					let data = config.shared_secret(&name)?;445					let owners = data446						.owners447						.iter()448						.map(|o| {449							if expected_owners.contains(o) {450								o.green().to_string()451							} else {452								o.red().to_string()453							}454						})455						.collect::<Vec<_>>();456					table.push(SecretDisplay {457						owners: owners.join(", "),458						name,459					})460				}461				info!("loaded\n{}", Table::new(table).to_string())462			}463		}464		Ok(())465	}466}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,6 +1,7 @@
 use std::{
 	collections::HashMap,
 	ffi::OsStr,
+	pin,
 	process::Stdio,
 	sync::{Arc, Mutex},
 	task::Poll,
@@ -10,7 +11,7 @@
 use futures::StreamExt;
 use itertools::Either;
 use once_cell::sync::Lazy;
-use openssh::{OverSsh, Session};
+use openssh::{OverSsh, OwningCommand, Session};
 use regex::Regex;
 use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
@@ -44,6 +45,15 @@
 	ssh_session: Option<Arc<Session>>,
 }
 impl MyCommand {
+	pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {
+		assert!(!cmd.as_ref().is_empty());
+		Self {
+			command: ostoutf8(cmd),
+			args: vec![],
+			env: vec![],
+			ssh_session: Some(session),
+		}
+	}
 	pub fn new(cmd: impl AsRef<OsStr>) -> Self {
 		assert!(!cmd.as_ref().is_empty());
 		Self {
@@ -66,6 +76,29 @@
 		out.extend(self.args);
 		out
 	}
+
+	/// Translates environment variables into env command execution.
+	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).
+	///
+	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.
+	/// Figure out some way to transfer environment using stdio?
+	fn translate_env_into_env(self) -> Self {
+		if self.env.is_empty() {
+			return self;
+		}
+		let mut out = Self::new("env");
+		if let Some(session) = self.ssh_session {
+			out = out.ssh_session(session);
+		}
+		for (k, v) in self.env {
+			assert!(!k.contains('='));
+			out.arg(format!("{k}={v}"));
+		}
+		out.arg(self.command);
+		out.args(self.args);
+
+		out
+	}
 	fn into_string(self) -> String {
 		let mut out = String::new();
 		if !self.env.is_empty() {
@@ -98,7 +131,7 @@
 	}
 	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {
 		Ok(if let Some(session) = self.ssh_session.clone() {
-			let cmd = self.into_command();
+			let cmd = self.translate_env_into_env().into_command();
 			Either::Right(
 				cmd.over_ssh(session)
 					.map_err(|e| anyhow!("ssh error: {e}"))?,
@@ -126,6 +159,11 @@
 		self.arg(value);
 		self
 	}
+	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+		self.env
+			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));
+		self
+	}
 	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
 		for arg in args.into_iter() {
 			let arg = arg.as_ref();
@@ -133,9 +171,10 @@
 		}
 		self
 	}
-	pub fn sudo(self) -> Self {
+	pub fn sudo(mut self) -> Self {
 		if std::env::var_os("NO_SUDO").is_some() {
 			let mut out = Self::new("su");
+			out.ssh_session = self.ssh_session.take();
 			out.arg("-c").arg(self.into_string());
 			out
 		} else {
@@ -144,27 +183,38 @@
 			out
 		}
 	}
-	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {
+	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
+		self.ssh_session = Some(on);
+		self
+	}
+	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
 		let mut out = Self::new("ssh");
+		out.ssh_session = self.ssh_session.take();
 		out.arg(on).arg("--");
 		out.arg(self.into_string());
 		out
 	}
-	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {
-		self.ssh_session = Some(session);
-		self
-	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
-		let cmd = self.into_command();
-		run_nix_inner(str, cmd, &mut PlainHandler).await?;
+		let cmd = self.into_command_new()?;
+		match cmd {
+			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,
+			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,
+		};
 		Ok(())
 	}
 	pub async fn run_string(self) -> Result<String> {
+		let bytes = self.run_bytes().await?;
+		Ok(String::from_utf8(bytes)?)
+	}
+	pub async fn run_bytes(self) -> Result<Vec<u8>> {
 		let str = self.clone().into_string();
-		let cmd = self.into_command();
-		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;
+		let cmd = self.into_command_new()?;
+		let v = match cmd {
+			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,
+			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,
+		};
 		Ok(v)
 	}
 
@@ -172,7 +222,8 @@
 		let str = self.clone().into_string();
 		let mut cmd = self.into_command();
 		cmd.arg("--log-format").arg("internal-json");
-		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await
+		let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;
+		Ok(String::from_utf8(bytes)?)
 	}
 	pub async fn run_nix(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -198,7 +249,7 @@
 	str: String,
 	cmd: Command,
 	handler: &mut dyn Handler,
-) -> Result<String> {
+) -> Result<Vec<u8>> {
 	Ok(run_nix_inner_raw(str, cmd, true, handler, None)
 		.await?
 		.expect("has out"))
@@ -208,6 +259,24 @@
 	assert!(v.is_none());
 	Ok(())
 }
+async fn run_nix_inner_stdout_ssh(
+	str: String,
+	cmd: OwningCommand<Arc<Session>>,
+	handler: &mut dyn Handler,
+) -> Result<Vec<u8>> {
+	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)
+		.await?
+		.expect("has out"))
+}
+async fn run_nix_inner_ssh(
+	str: String,
+	cmd: OwningCommand<Arc<Session>>,
+	handler: &mut dyn Handler,
+) -> Result<()> {
+	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
+	assert!(v.is_none());
+	Ok(())
+}
 
 pub trait Handler: Send {
 	fn handle_line(&mut self, e: &str);
@@ -468,7 +537,7 @@
 	want_stdout: bool,
 	err_handler: &mut dyn Handler,
 	mut out_handler: Option<&mut dyn Handler>,
-) -> Result<Option<String>> {
+) -> Result<Option<Vec<u8>>> {
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
 	let mut child = cmd.spawn()?;
@@ -522,7 +591,71 @@
 		}
 	}
 
-	Ok(out_buf.map(String::from_utf8).transpose()?)
+	Ok(out_buf)
+}
+async fn run_nix_inner_raw_ssh(
+	str: String,
+	mut cmd: OwningCommand<Arc<Session>>,
+	want_stdout: bool,
+	err_handler: &mut dyn Handler,
+	mut out_handler: Option<&mut dyn Handler>,
+) -> Result<Option<Vec<u8>>> {
+	cmd.stderr(openssh::Stdio::piped());
+	cmd.stdout(openssh::Stdio::piped());
+	let mut child = cmd.spawn().await?;
+	let mut stderr = child.stderr().take().unwrap();
+	let stdout = child.stdout().take().unwrap();
+	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
+	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
+	let mut ob = want_stdout
+		.then(|| out.take().unwrap())
+		.unwrap_or_else(|| Box::new(EmptyAsyncRead));
+	let mut ol = (!want_stdout)
+		.then(|| out.take().unwrap())
+		.unwrap_or_else(|| Box::new(EmptyAsyncRead));
+	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
+	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
+
+	// while let Some(line) = read.next().await? {}
+
+	let mut out_buf = if want_stdout { Some(vec![]) } else { None };
+
+	let mut wait_future = pin::pin!(child.wait());
+	loop {
+		select! {
+			e = err.next() => {
+				if let Some(e) = e {
+					let e = e?;
+					err_handler.handle_line(&e);
+				}
+			},
+			o = ob.next() => {
+				if let Some(o) = o {
+					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
+				}
+			},
+			o = ol.next() => {
+				if let Some(o) = o {
+					let o = o?;
+					if let Some(out) = out_handler.as_mut() {
+						out.handle_line(&o)
+					} else {
+						err_handler.handle_line(&o)
+					}
+					// out_handler.handle_info(&o);
+				}
+			},
+			code = &mut wait_future => {
+				let code = code?;
+				if !code.success() {
+					anyhow::bail!("command '{str}' failed with status {}", code);
+				}
+				break;
+			}
+		}
+	}
+
+	Ok(out_buf)
 }
 
 pub trait ErrorRecorder: Send {
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,10 +1,10 @@
 use std::{
 	env::current_dir,
-	ffi::OsString,
+	ffi::{OsStr, OsString},
 	io::Write,
 	ops::Deref,
 	path::PathBuf,
-	sync::{Arc, Mutex, MutexGuard},
+	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
 use anyhow::{anyhow, bail, Context, Result};
@@ -46,16 +46,55 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	async fn open_session(&self) -> Result<openssh::Session> {
-		let mut session = SessionBuilder::default();
+	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		// FIXME: TOCTOU
+		if let Some(session) = &self.session.get() {
+			return Ok((*session).clone());
+		};
+		let session = SessionBuilder::default();
 
-		session
+		let session = session
 			.connect(&self.name)
 			.await
-			.map_err(|e| anyhow!("ssh error: {e}"))
+			.map_err(|e| anyhow!("ssh error: {e}"))?;
+		let session = Arc::new(session);
+		self.session.set(session.clone()).expect("TOCTOU happened");
+		Ok(session)
+	}
+	pub async fn mktemp_dir(&self) -> Result<String> {
+		let mut cmd = self.cmd("mktemp").await?;
+		cmd.arg("-d");
+		let path = cmd.run_string().await?;
+		Ok(path.trim_end().to_owned())
 	}
+	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_bytes().await
+	}
+	pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_string().await
+	}
+	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
+		let session = self.open_session().await?;
+		Ok(MyCommand::new_on(cmd, session))
+	}
+
+	pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 }
 
 impl Config {
@@ -96,12 +135,21 @@
 		command.run_string().await
 	}
 
+	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
+		Ok(ConfigHost {
+			name: name.to_owned(),
+			session: OnceLock::new(),
+		})
+	}
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
 		let fleet_field = &self.fleet_field;
 		let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
 		let mut out = vec![];
 		for name in names {
-			out.push(ConfigHost { name })
+			out.push(ConfigHost {
+				name,
+				session: OnceLock::new(),
+			})
 		}
 		Ok(out)
 	}
@@ -152,19 +200,6 @@
 		host_secrets.insert(secret, value);
 	}
 
-	pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>> {
-		let data = z85::encode(&data);
-		let mut cmd = MyCommand::new("fleet-install-secrets");
-		cmd.arg("decrypt").eqarg("--secret", data);
-		cmd = cmd.sudo().ssh(host);
-		let encoded = cmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		z85::decode(encoded).context("bad encoded data? outdated host?")
-	}
 	pub async fn reencrypt_on_host(
 		&self,
 		host: &str,
modifiednixos/meta.nixdiffbeforeafterboth
--- a/nixos/meta.nix
+++ b/nixos/meta.nix
@@ -1,11 +1,18 @@
-{ lib, ... }:
-with lib;
 {
+  lib,
+  pkgs,
+  ...
+}:
+with lib; {
   options = with types; {
+    nixpkgs.resolvedPkgs = mkOption {
+      type = types.pkgs // {description = "nixpkgs.pkgs";};
+      description = "Value of pkgs";
+    };
     tags = mkOption {
       type = listOf str;
       description = "Host tags";
-      default = [ ];
+      default = [];
     };
     network = mkOption {
       type = submodule {
@@ -13,12 +20,12 @@
           internalIps = mkOption {
             type = listOf str;
             description = "Internal ips";
-            default = [ ];
+            default = [];
           };
           externalIps = mkOption {
             type = listOf str;
             description = "External ips";
-            default = [ ];
+            default = [];
           };
         };
       };
@@ -29,7 +36,8 @@
     };
   };
   config = {
-    tags = [ "all" ];
-    network = { };
+    tags = ["all"];
+    network = {};
+    nixpkgs.resolvedPkgs = pkgs;
   };
 }