git.delta.rocks / jrsonnet / refs/commits / a369041a95eb

difftreelog

refactor shell abstraction

Yaroslav Bolyukin2023-12-28parent: #7e2e5c5.patch.diff
in: trunk

6 files changed

modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -472,7 +472,7 @@
 	($field:ident $($tt:tt)*) => {{
 		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};
 		#[allow(unused_mut, reason = "might be used if indexed")]
-		let mut out = NixExprBuilder::field($field);
+		let mut out = NixExprBuilder::field($field.clone());
 		nix_expr_inner!(@field(out) $($tt)*);
 		out
 	}};
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -291,9 +291,11 @@
 		info!("building");
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
-		let drv = nix_go!(fleet_field.buildSystems(Obj {
-			localSystem: { config.local_system.clone() }
-		}));
+		let drv = nix_go!(
+			fleet_field.buildSystems(Obj {
+				localSystem: { config.local_system.clone() }
+			})[{ action.build_attr() }][{ host }]
+		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
 				info!("sd-image build failed");
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,4 +1,5 @@
 use crate::{
+	command::MyCommand,
 	fleetdata::{FleetSecret, FleetSharedSecret},
 	host::Config,
 	nix_go, nix_go_json,
@@ -12,6 +13,7 @@
 	collections::HashSet,
 	io::{self, Cursor, Read},
 	path::PathBuf,
+	sync::Arc,
 };
 use tabled::{Table, Tabled};
 use tokio::fs::read_to_string;
@@ -97,8 +99,9 @@
 			Secret::InvokeGenerator => {
 				let config_field = &config.config_unchecked_field;
 
-				let generate_impure =
-					nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);
+				let secret =
+					nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);
+				let generate_impure = nix_go!(secret.generateImpure);
 				let on = nix_go!(generate_impure.on);
 				let call_package = nix_go!(
 					config_field.buildableSystems(Obj {
@@ -106,13 +109,62 @@
 					})[on]
 						.config
 						.nixpkgs
-						.pkgs
+						.resolvedPkgs
 						.callPackage
 				);
-				let generator = nix_go!(call_package(generate_impure.generator));
-				let built = generator.build().await?;
-				// .as_json().await?;
-				dbg!(&built);
+				let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));
+				let built = &generator.build().await?["out"];
+				let mut nix = MyCommand::new("nix");
+				let on: String = on.as_json().await?;
+				nix.arg("copy")
+					.arg("--substitute-on-destination")
+					.comparg("--to", format!("ssh-ng://{on}"))
+					.arg(built);
+				nix.run_nix().await?;
+
+				let session = config.host(&on).await?;
+
+				let owners: Vec<String> = nix_go_json!(secret.expectedOwners);
+				dbg!(&owners);
+
+				let mut recipients = String::new();
+				for owner in owners {
+					let key = config.key(&owner).await?;
+					recipients.push_str(&format!("-r \"{key}\" "));
+				}
+				recipients.push_str("-e");
+
+				// FIXME: security: created directory might be accessible to other users
+				// This shouldn't be much of a concern, as data is encrypted right after creation, yet
+				// still better to have.
+				let tempdir = session.mktemp_dir().await?;
+
+				let mut gen = session.cmd(built).await?;
+				gen.env("rageArgs", recipients).env("out", &tempdir);
+				gen.run().await?;
+
+				{
+					let marker = session.read_file_text(format!("{tempdir}/marker")).await?;
+					ensure!(marker == "SUCCESS", "generation not succeeded");
+				}
+
+				let public = session
+					.read_file_bin(format!("{tempdir}/public"))
+					.await
+					.ok();
+				let secret = session
+					.read_file_bin(format!("{tempdir}/secret"))
+					.await
+					.ok();
+				if let Some(secret) = &secret {
+					ensure!(
+						age::Decryptor::new(Cursor::new(&secret)).is_ok(),
+						"builder produced non-encrypted value as secret, this is highly insecure"
+					);
+				}
+				dbg!(&secret);
+				// // .as_json().await?;
+				// dbg!(&built);
 			}
 			Secret::ForceKeys => {
 				for host in config.list_hosts().await? {
@@ -249,7 +301,8 @@
 				if secret.secret.is_empty() {
 					bail!("no secret {name}");
 				}
-				let data = config.decrypt_on_host(&machine, secret.secret).await?;
+				let host = config.host(&machine).await?;
+				let data = host.decrypt(secret.secret).await?;
 				if plaintext {
 					let s = String::from_utf8(data).context("output is not utf8")?;
 					print!("{s}");
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
before · cmds/fleet/src/command.rs
1use std::{2	collections::HashMap,3	ffi::OsStr,4	process::Stdio,5	sync::{Arc, Mutex},6	task::Poll,7};89use anyhow::{anyhow, Result};10use futures::StreamExt;11use itertools::Either;12use once_cell::sync::Lazy;13use openssh::{OverSsh, Session};14use regex::Regex;15use serde::{de::Visitor, Deserialize};16use tokio::{io::AsyncRead, process::Command, select};17use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};18use tracing::{info, info_span, warn, Span};19use tracing_indicatif::span_ext::IndicatifSpanExt;2021fn escape_bash(input: &str, out: &mut String) {22	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";23	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {24		out.push_str(input);25		return;26	}27	out.push('\'');28	for (i, v) in input.split('\'').enumerate() {29		if i != 0 {30			out.push_str("'\"'\"'");31		}32		out.push_str(v);33	}34	out.push('\'');35}36fn ostoutf8(os: impl AsRef<OsStr>) -> String {37	os.as_ref().to_str().expect("non-utf8 data").to_owned()38}39#[derive(Clone)]40pub struct MyCommand {41	command: String,42	args: Vec<String>,43	env: Vec<(String, String)>,44	ssh_session: Option<Arc<Session>>,45}46impl MyCommand {47	pub fn new(cmd: impl AsRef<OsStr>) -> Self {48		assert!(!cmd.as_ref().is_empty());49		Self {50			command: ostoutf8(cmd),51			args: vec![],52			env: vec![],53			ssh_session: None,54		}55	}56	fn into_args(self) -> Vec<String> {57		let mut out = Vec::new();58		if !self.env.is_empty() {59			out.push("env".to_owned());60			for (k, v) in self.env {61				assert!(!k.contains('='));62				out.push(format!("{k}={v}"));63			}64		}65		out.push(self.command);66		out.extend(self.args);67		out68	}69	fn into_string(self) -> String {70		let mut out = String::new();71		if !self.env.is_empty() {72			out.push_str("env");73			for (k, v) in self.env {74				out.push(' ');75				assert!(!k.contains('='));76				escape_bash(&k, &mut out);77				out.push('=');78				escape_bash(&v, &mut out);79			}80		}81		if !out.is_empty() {82			out.push(' ');83		}84		escape_bash(&self.command, &mut out);85		for arg in self.args {86			out.push(' ');87			escape_bash(&arg, &mut out);88		}89		out90	}91	fn into_command(self) -> Command {92		let mut out = Command::new(self.command);93		out.args(self.args);94		for (k, v) in self.env {95			out.env(k, v);96		}97		out98	}99	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {100		Ok(if let Some(session) = self.ssh_session.clone() {101			let cmd = self.into_command();102			Either::Right(103				cmd.over_ssh(session)104					.map_err(|e| anyhow!("ssh error: {e}"))?,105			)106		} else {107			let cmd = self.into_command();108			Either::Left(cmd)109		})110	}111	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {112		let arg = arg.as_ref();113		self.args.push(ostoutf8(arg));114		self115	}116	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {117		let arg = arg.as_ref();118		let value = value.as_ref();119		let arg = ostoutf8(arg);120		let value = ostoutf8(value);121		self.arg(format!("{arg}={value}"));122		self123	}124	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {125		self.arg(arg);126		self.arg(value);127		self128	}129	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {130		for arg in args.into_iter() {131			let arg = arg.as_ref();132			self.args.push(ostoutf8(arg));133		}134		self135	}136	pub fn sudo(self) -> Self {137		if std::env::var_os("NO_SUDO").is_some() {138			let mut out = Self::new("su");139			out.arg("-c").arg(self.into_string());140			out141		} else {142			let mut out = Self::new("sudo");143			out.args(self.into_args());144			out145		}146	}147	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {148		let mut out = Self::new("ssh");149		out.arg(on).arg("--");150		out.arg(self.into_string());151		out152	}153	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {154		self.ssh_session = Some(session);155		self156	}157158	pub async fn run(self) -> Result<()> {159		let str = self.clone().into_string();160		let cmd = self.into_command();161		run_nix_inner(str, cmd, &mut PlainHandler).await?;162		Ok(())163	}164	pub async fn run_string(self) -> Result<String> {165		let str = self.clone().into_string();166		let cmd = self.into_command();167		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;168		Ok(v)169	}170171	pub async fn run_nix_string(self) -> Result<String> {172		let str = self.clone().into_string();173		let mut cmd = self.into_command();174		cmd.arg("--log-format").arg("internal-json");175		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await176	}177	pub async fn run_nix(self) -> Result<()> {178		let str = self.clone().into_string();179		let mut cmd = self.into_command();180		cmd.arg("--log-format").arg("internal-json");181		cmd.stdout(Stdio::inherit());182		run_nix_inner(str, cmd, &mut NixHandler::default()).await183	}184}185186struct EmptyAsyncRead;187impl AsyncRead for EmptyAsyncRead {188	fn poll_read(189		self: std::pin::Pin<&mut Self>,190		_cx: &mut std::task::Context<'_>,191		_buf: &mut tokio::io::ReadBuf<'_>,192	) -> Poll<std::io::Result<()>> {193		Poll::Pending194	}195}196197async fn run_nix_inner_stdout(198	str: String,199	cmd: Command,200	handler: &mut dyn Handler,201) -> Result<String> {202	Ok(run_nix_inner_raw(str, cmd, true, handler, None)203		.await?204		.expect("has out"))205}206async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {207	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;208	assert!(v.is_none());209	Ok(())210}211212pub trait Handler: Send {213	fn handle_line(&mut self, e: &str);214}215216pub struct ClonableHandler<H>(Arc<Mutex<H>>);217impl<H> Clone for ClonableHandler<H> {218	fn clone(&self) -> Self {219		Self(self.0.clone())220	}221}222impl<H> ClonableHandler<H> {223	pub fn new(inner: H) -> Self {224		Self(Arc::new(Mutex::new(inner)))225	}226}227impl<H: Handler> Handler for ClonableHandler<H> {228	fn handle_line(&mut self, e: &str) {229		self.0.lock().unwrap().handle_line(e)230	}231}232233struct PlainHandler;234impl Handler for PlainHandler {235	fn handle_line(&mut self, e: &str) {236		info!(target: "log", "{e}");237	}238}239240pub struct NoopHandler;241impl Handler for NoopHandler {242	fn handle_line(&mut self, _e: &str) {}243}244245#[derive(Default)]246pub struct NixHandler {247	spans: HashMap<u64, Span>,248}249fn process_message(m: &str) -> String {250	static OSC_CLEANER: Lazy<Regex> =251		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());252	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());253	let m = OSC_CLEANER.replace_all(m, "");254	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,255	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.256	DETABBER.replace_all(m.as_ref(), "  ").to_string()257}258impl Handler for NixHandler {259	fn handle_line(&mut self, e: &str) {260		if let Some(e) = e.strip_prefix("@nix ") {261			let log: NixLog = match serde_json::from_str(e) {262				Ok(l) => l,263				Err(err) => {264					warn!("failed to parse nix log line {:?}: {}", e, err);265					return;266				}267			};268			match log {269				NixLog::Msg { msg, raw_msg, .. } => {270					#[allow(clippy::nonminimal_bool)]271					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))272					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")273					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {274						if let Some(raw_msg) = raw_msg {275							if !msg.is_empty() {276								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())277							} else {278								info!(target: "nix", "{}", raw_msg.trim_end())279							}280						} else {281							info!(target: "nix", "{}", msg.trim_end())282						}283					}284				}285				NixLog::Start {286					ref fields,287					typ,288					id,289					..290				} if typ == 105 && !fields.is_empty() => {291					if let [LogField::String(drv), ..] = &fields[..] {292						let mut drv = drv.as_str();293						if let Some(pkg) = drv.strip_prefix("/nix/store/") {294							let mut it = pkg.splitn(2, '-');295							it.next();296							if let Some(pkg) = it.next() {297								drv = pkg;298							}299						}300						info!(target: "nix","building {}", drv);301						let span = info_span!("build", drv);302						span.pb_start();303						self.spans.insert(id, span);304					} else {305						warn!("bad build log: {:?}", log)306					}307				}308				NixLog::Start {309					ref fields,310					typ,311					id,312					..313				} if typ == 100 && fields.len() >= 3 => {314					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =315						&fields[..]316					{317						let mut drv = drv.as_str();318319						if let Some(pkg) = drv.strip_prefix("/nix/store/") {320							let mut it = pkg.splitn(2, '-');321							it.next();322							if let Some(pkg) = it.next() {323								drv = pkg;324							}325						}326						// info!(target: "nix","copying {} {} -> {}", drv, from, to);327						let span = info_span!("copy", from, to, drv);328						span.pb_start();329						self.spans.insert(id, span);330					} else {331						warn!("bad copy log: {:?}", log)332					}333				}334				NixLog::Start { text, typ, id, .. }335					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>336				{337					if !text.is_empty()338						&& text != "querying info about missing paths"339						&& text != "copying 0 paths"340						// Too much spam on lazy-trees branch341						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))342					{343						let span = info_span!("job");344						span.pb_start();345						span.pb_set_message(&process_message(text.trim()));346						self.spans.insert(id, span);347						info!(target: "nix", "{}", text);348					}349				}350				NixLog::Start {351					text,352					level: 0,353					typ: 108,354					..355				} if text.is_empty() => {356					// Cache lookup? Coupled with copy log357				}358				NixLog::Start {359					text,360					level: 4,361					typ: 109,362					..363				} if text.starts_with("querying info about ") => {364					// Cache lookup365				}366				NixLog::Start {367					text,368					level: 4,369					typ: 101,370					..371				} if text.starts_with("downloading ") => {372					// NAR downloading, coupled with copy log373				}374				NixLog::Start {375					text,376					level: 1,377					typ: 111,378					..379				} if text.starts_with("waiting for a machine to build ") => {380					// Useless repeating notification about build381				}382				NixLog::Start {383					text,384					level: 3,385					typ: 111,386					..387				} if text.starts_with("resolved derivation: ") => {388					// CA resolved389				}390				NixLog::Start {391					text,392					level: 1,393					typ: 111,394					id,395					..396				} if text.starts_with("waiting for lock on ") => {397					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();398					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {399						drv = txt;400					}401					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {402						drv = txt;403					}404					if let Some(txt) = drv.split("', '").next() {405						drv = txt;406					}407					if let Some(pkg) = drv.strip_prefix("/nix/store/") {408						let mut it = pkg.splitn(2, '-');409						it.next();410						if let Some(pkg) = it.next() {411							drv = pkg;412						}413					}414					let span = info_span!("waiting on drv", drv);415					span.pb_start();416					self.spans.insert(id, span);417					// Concurrent build of the same message418				}419				NixLog::Stop { id, .. } => {420					self.spans.remove(&id);421				}422				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {423					if let Some(span) = self.spans.get(&id) {424						if let LogField::String(s) = &fields[0] {425							span.pb_set_message(&process_message(s.trim()));426						} else {427							warn!("bad fields: {fields:?}");428						}429					} else {430						warn!("unknown result id: {id} {typ} {fields:?}");431					}432					// dbg!(fields, id, typ);433				}434				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {435					if let Some(span) = self.spans.get(&id) {436						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =437							&fields[..4]438						{439							span.pb_set_length(*expected);440							span.pb_set_position(*done);441						} else {442							warn!("bad fields: {fields:?}");443						}444					} else {445						// warn!("unknown result id: {id} {typ} {fields:?}");446						// Unaccounted progress.447					}448					// dbg!(fields, id, typ);449				}450				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {451					// Set phase, expected452				}453				_ => warn!("unknown log: {:?}", log),454			};455		} else {456			let e = e.trim();457			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {458				return;459			}460			info!("{e}")461		}462	}463}464465async fn run_nix_inner_raw(466	str: String,467	mut cmd: Command,468	want_stdout: bool,469	err_handler: &mut dyn Handler,470	mut out_handler: Option<&mut dyn Handler>,471) -> Result<Option<String>> {472	cmd.stderr(Stdio::piped());473	cmd.stdout(Stdio::piped());474	let mut child = cmd.spawn()?;475	let mut stderr = child.stderr.take().unwrap();476	let stdout = child.stdout.take().unwrap();477	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());478	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));479	let mut ob = want_stdout480		.then(|| out.take().unwrap())481		.unwrap_or_else(|| Box::new(EmptyAsyncRead));482	let mut ol = (!want_stdout)483		.then(|| out.take().unwrap())484		.unwrap_or_else(|| Box::new(EmptyAsyncRead));485	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());486	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());487488	// while let Some(line) = read.next().await? {}489490	let mut out_buf = if want_stdout { Some(vec![]) } else { None };491	loop {492		select! {493			e = err.next() => {494				if let Some(e) = e {495					let e = e?;496					err_handler.handle_line(&e);497				}498			},499			o = ob.next() => {500				if let Some(o) = o {501					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);502				}503			},504			o = ol.next() => {505				if let Some(o) = o {506					let o = o?;507					if let Some(out) = out_handler.as_mut() {508						out.handle_line(&o)509					} else {510						err_handler.handle_line(&o)511					}512					// out_handler.handle_info(&o);513				}514			},515			code = child.wait() => {516				let code = code?;517				if !code.success() {518					anyhow::bail!("command '{str}' failed with status {}", code);519				}520				break;521			}522		}523	}524525	Ok(out_buf.map(String::from_utf8).transpose()?)526}527528pub trait ErrorRecorder: Send {529	/// Return true to discard message from logging530	fn push_message(&mut self, msg: &str) -> bool;531}532533#[derive(Debug)]534enum LogField {535	String(String),536	Num(u64),537}538539impl<'de> Deserialize<'de> for LogField {540	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>541	where542		D: serde::Deserializer<'de>,543	{544		struct StringOrNum;545		impl<'de> Visitor<'de> for StringOrNum {546			type Value = LogField;547548			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {549				write!(f, "string or unsigned")550			}551552			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>553			where554				E: serde::de::Error,555			{556				Ok(LogField::String(v.to_owned()))557			}558559			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>560			where561				E: serde::de::Error,562			{563				Ok(LogField::Num(v))564			}565		}566567		deserializer.deserialize_any(StringOrNum)568	}569}570571#[derive(Deserialize, Debug)]572#[serde(rename_all = "camelCase", tag = "action")]573#[allow(dead_code)]574enum NixLog {575	Msg {576		level: u32,577		msg: String,578		raw_msg: Option<String>,579	},580	Start {581		id: u64,582		level: u32,583		#[serde(default)]584		fields: Vec<LogField>,585		text: String,586		#[serde(rename = "type")]587		typ: u32,588	},589	Stop {590		id: u64,591	},592	Result {593		id: u64,594		#[serde(rename = "type")]595		typ: u32,596		#[serde(default)]597		fields: Vec<LogField>,598	},599}
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,10 +1,10 @@
 use std::{
 	env::current_dir,
-	ffi::OsString,
+	ffi::{OsStr, OsString},
 	io::Write,
 	ops::Deref,
 	path::PathBuf,
-	sync::{Arc, Mutex, MutexGuard},
+	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
 use anyhow::{anyhow, bail, Context, Result};
@@ -46,16 +46,55 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	async fn open_session(&self) -> Result<openssh::Session> {
-		let mut session = SessionBuilder::default();
+	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		// FIXME: TOCTOU
+		if let Some(session) = &self.session.get() {
+			return Ok((*session).clone());
+		};
+		let session = SessionBuilder::default();
 
-		session
+		let session = session
 			.connect(&self.name)
 			.await
-			.map_err(|e| anyhow!("ssh error: {e}"))
+			.map_err(|e| anyhow!("ssh error: {e}"))?;
+		let session = Arc::new(session);
+		self.session.set(session.clone()).expect("TOCTOU happened");
+		Ok(session)
+	}
+	pub async fn mktemp_dir(&self) -> Result<String> {
+		let mut cmd = self.cmd("mktemp").await?;
+		cmd.arg("-d");
+		let path = cmd.run_string().await?;
+		Ok(path.trim_end().to_owned())
 	}
+	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_bytes().await
+	}
+	pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_string().await
+	}
+	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
+		let session = self.open_session().await?;
+		Ok(MyCommand::new_on(cmd, session))
+	}
+
+	pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 }
 
 impl Config {
@@ -96,12 +135,21 @@
 		command.run_string().await
 	}
 
+	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
+		Ok(ConfigHost {
+			name: name.to_owned(),
+			session: OnceLock::new(),
+		})
+	}
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
 		let fleet_field = &self.fleet_field;
 		let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
 		let mut out = vec![];
 		for name in names {
-			out.push(ConfigHost { name })
+			out.push(ConfigHost {
+				name,
+				session: OnceLock::new(),
+			})
 		}
 		Ok(out)
 	}
@@ -152,19 +200,6 @@
 		host_secrets.insert(secret, value);
 	}
 
-	pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>> {
-		let data = z85::encode(&data);
-		let mut cmd = MyCommand::new("fleet-install-secrets");
-		cmd.arg("decrypt").eqarg("--secret", data);
-		cmd = cmd.sudo().ssh(host);
-		let encoded = cmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		z85::decode(encoded).context("bad encoded data? outdated host?")
-	}
 	pub async fn reencrypt_on_host(
 		&self,
 		host: &str,
modifiednixos/meta.nixdiffbeforeafterboth
--- a/nixos/meta.nix
+++ b/nixos/meta.nix
@@ -1,11 +1,18 @@
-{ lib, ... }:
-with lib;
 {
+  lib,
+  pkgs,
+  ...
+}:
+with lib; {
   options = with types; {
+    nixpkgs.resolvedPkgs = mkOption {
+      type = types.pkgs // {description = "nixpkgs.pkgs";};
+      description = "Value of pkgs";
+    };
     tags = mkOption {
       type = listOf str;
       description = "Host tags";
-      default = [ ];
+      default = [];
     };
     network = mkOption {
       type = submodule {
@@ -13,12 +20,12 @@
           internalIps = mkOption {
             type = listOf str;
             description = "Internal ips";
-            default = [ ];
+            default = [];
           };
           externalIps = mkOption {
             type = listOf str;
             description = "External ips";
-            default = [ ];
+            default = [];
           };
         };
       };
@@ -29,7 +36,8 @@
     };
   };
   config = {
-    tags = [ "all" ];
-    network = { };
+    tags = ["all"];
+    network = {};
+    nixpkgs.resolvedPkgs = pkgs;
   };
 }