git.delta.rocks / jrsonnet / refs/commits / a369041a95eb

difftreelog

refactor shell abstraction

Yaroslav Bolyukin2023-12-28parent: #7e2e5c5.patch.diff
in: trunk

6 files changed

modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -472,7 +472,7 @@
 	($field:ident $($tt:tt)*) => {{
 		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};
 		#[allow(unused_mut, reason = "might be used if indexed")]
-		let mut out = NixExprBuilder::field($field);
+		let mut out = NixExprBuilder::field($field.clone());
 		nix_expr_inner!(@field(out) $($tt)*);
 		out
 	}};
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -291,9 +291,11 @@
 		info!("building");
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
-		let drv = nix_go!(fleet_field.buildSystems(Obj {
-			localSystem: { config.local_system.clone() }
-		}));
+		let drv = nix_go!(
+			fleet_field.buildSystems(Obj {
+				localSystem: { config.local_system.clone() }
+			})[{ action.build_attr() }][{ host }]
+		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
 				info!("sd-image build failed");
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,4 +1,5 @@
 use crate::{
+	command::MyCommand,
 	fleetdata::{FleetSecret, FleetSharedSecret},
 	host::Config,
 	nix_go, nix_go_json,
@@ -12,6 +13,7 @@
 	collections::HashSet,
 	io::{self, Cursor, Read},
 	path::PathBuf,
+	sync::Arc,
 };
 use tabled::{Table, Tabled};
 use tokio::fs::read_to_string;
@@ -97,8 +99,9 @@
 			Secret::InvokeGenerator => {
 				let config_field = &config.config_unchecked_field;
 
-				let generate_impure =
-					nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);
+				let secret =
+					nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);
+				let generate_impure = nix_go!(secret.generateImpure);
 				let on = nix_go!(generate_impure.on);
 				let call_package = nix_go!(
 					config_field.buildableSystems(Obj {
@@ -106,13 +109,62 @@
 					})[on]
 						.config
 						.nixpkgs
-						.pkgs
+						.resolvedPkgs
 						.callPackage
 				);
-				let generator = nix_go!(call_package(generate_impure.generator));
-				let built = generator.build().await?;
-				// .as_json().await?;
-				dbg!(&built);
+				let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));
+				let built = &generator.build().await?["out"];
+				let mut nix = MyCommand::new("nix");
+				let on: String = on.as_json().await?;
+				nix.arg("copy")
+					.arg("--substitute-on-destination")
+					.comparg("--to", format!("ssh-ng://{on}"))
+					.arg(built);
+				nix.run_nix().await?;
+
+				let session = config.host(&on).await?;
+
+				let owners: Vec<String> = nix_go_json!(secret.expectedOwners);
+				dbg!(&owners);
+
+				let mut recipients = String::new();
+				for owner in owners {
+					let key = config.key(&owner).await?;
+					recipients.push_str(&format!("-r \"{key}\" "));
+				}
+				recipients.push_str("-e");
+
+				// FIXME: security: created directory might be accessible to other users
+				// This shouldn't be much of a concern, as data is encrypted right after creation, yet
+				// still better to have.
+				let tempdir = session.mktemp_dir().await?;
+
+				let mut gen = session.cmd(built).await?;
+				gen.env("rageArgs", recipients).env("out", &tempdir);
+				gen.run().await?;
+
+				{
+					let marker = session.read_file_text(format!("{tempdir}/marker")).await?;
+					ensure!(marker == "SUCCESS", "generation not succeeded");
+				}
+
+				let public = session
+					.read_file_bin(format!("{tempdir}/public"))
+					.await
+					.ok();
+				let secret = session
+					.read_file_bin(format!("{tempdir}/secret"))
+					.await
+					.ok();
+				if let Some(secret) = &secret {
+					ensure!(
+						age::Decryptor::new(Cursor::new(&secret)).is_ok(),
+						"builder produced non-encrypted value as secret, this is highly insecure"
+					);
+				}
+				dbg!(&secret);
+				// // .as_json().await?;
+				// dbg!(&built);
 			}
 			Secret::ForceKeys => {
 				for host in config.list_hosts().await? {
@@ -249,7 +301,8 @@
 				if secret.secret.is_empty() {
 					bail!("no secret {name}");
 				}
-				let data = config.decrypt_on_host(&machine, secret.secret).await?;
+				let host = config.host(&machine).await?;
+				let data = host.decrypt(secret.secret).await?;
 				if plaintext {
 					let s = String::from_utf8(data).context("output is not utf8")?;
 					print!("{s}");
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
after · cmds/fleet/src/command.rs
1use std::{2	collections::HashMap,3	ffi::OsStr,4	pin,5	process::Stdio,6	sync::{Arc, Mutex},7	task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, OwningCommand, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25		out.push_str(input);26		return;27	}28	out.push('\'');29	for (i, v) in input.split('\'').enumerate() {30		if i != 0 {31			out.push_str("'\"'\"'");32		}33		out.push_str(v);34	}35	out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38	os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42	command: String,43	args: Vec<String>,44	env: Vec<(String, String)>,45	ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48	pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {49		assert!(!cmd.as_ref().is_empty());50		Self {51			command: ostoutf8(cmd),52			args: vec![],53			env: vec![],54			ssh_session: Some(session),55		}56	}57	pub fn new(cmd: impl AsRef<OsStr>) -> Self {58		assert!(!cmd.as_ref().is_empty());59		Self {60			command: ostoutf8(cmd),61			args: vec![],62			env: vec![],63			ssh_session: None,64		}65	}66	fn into_args(self) -> Vec<String> {67		let mut out = Vec::new();68		if !self.env.is_empty() {69			out.push("env".to_owned());70			for (k, v) in self.env {71				assert!(!k.contains('='));72				out.push(format!("{k}={v}"));73			}74		}75		out.push(self.command);76		out.extend(self.args);77		out78	}7980	/// Translates environment variables into env command execution.81	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).82	///83	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.84	/// Figure out some way to transfer environment using stdio?85	fn translate_env_into_env(self) -> Self {86		if self.env.is_empty() {87			return self;88		}89		let mut out = Self::new("env");90		if let Some(session) = self.ssh_session {91			out = out.ssh_session(session);92		}93		for (k, v) in self.env {94			assert!(!k.contains('='));95			out.arg(format!("{k}={v}"));96		}97		out.arg(self.command);98		out.args(self.args);99100		out101	}102	fn into_string(self) -> String {103		let mut out = String::new();104		if !self.env.is_empty() {105			out.push_str("env");106			for (k, v) in self.env {107				out.push(' ');108				assert!(!k.contains('='));109				escape_bash(&k, &mut out);110				out.push('=');111				escape_bash(&v, &mut out);112			}113		}114		if !out.is_empty() {115			out.push(' ');116		}117		escape_bash(&self.command, &mut out);118		for arg in self.args {119			out.push(' ');120			escape_bash(&arg, &mut out);121		}122		out123	}124	fn into_command(self) -> Command {125		let mut out = Command::new(self.command);126		out.args(self.args);127		for (k, v) in self.env {128			out.env(k, v);129		}130		out131	}132	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {133		Ok(if let Some(session) = self.ssh_session.clone() {134			let cmd = self.translate_env_into_env().into_command();135			Either::Right(136				cmd.over_ssh(session)137					.map_err(|e| anyhow!("ssh error: {e}"))?,138			)139		} else {140			let cmd = self.into_command();141			Either::Left(cmd)142		})143	}144	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {145		let arg = arg.as_ref();146		self.args.push(ostoutf8(arg));147		self148	}149	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {150		let arg = arg.as_ref();151		let value = value.as_ref();152		let arg = ostoutf8(arg);153		let value = ostoutf8(value);154		self.arg(format!("{arg}={value}"));155		self156	}157	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158		self.arg(arg);159		self.arg(value);160		self161	}162	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {163		self.env164			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));165		self166	}167	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {168		for arg in args.into_iter() {169			let arg = arg.as_ref();170			self.args.push(ostoutf8(arg));171		}172		self173	}174	pub fn sudo(mut self) -> Self {175		if std::env::var_os("NO_SUDO").is_some() {176			let mut out = Self::new("su");177			out.ssh_session = self.ssh_session.take();178			out.arg("-c").arg(self.into_string());179			out180		} else {181			let mut out = Self::new("sudo");182			out.args(self.into_args());183			out184		}185	}186	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {187		self.ssh_session = Some(on);188		self189	}190	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {191		let mut out = Self::new("ssh");192		out.ssh_session = self.ssh_session.take();193		out.arg(on).arg("--");194		out.arg(self.into_string());195		out196	}197198	pub async fn run(self) -> Result<()> {199		let str = self.clone().into_string();200		let cmd = self.into_command_new()?;201		match cmd {202			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,203			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,204		};205		Ok(())206	}207	pub async fn run_string(self) -> Result<String> {208		let bytes = self.run_bytes().await?;209		Ok(String::from_utf8(bytes)?)210	}211	pub async fn run_bytes(self) -> Result<Vec<u8>> {212		let str = self.clone().into_string();213		let cmd = self.into_command_new()?;214		let v = match cmd {215			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,216			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,217		};218		Ok(v)219	}220221	pub async fn run_nix_string(self) -> Result<String> {222		let str = self.clone().into_string();223		let mut cmd = self.into_command();224		cmd.arg("--log-format").arg("internal-json");225		let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;226		Ok(String::from_utf8(bytes)?)227	}228	pub async fn run_nix(self) -> Result<()> {229		let str = self.clone().into_string();230		let mut cmd = self.into_command();231		cmd.arg("--log-format").arg("internal-json");232		cmd.stdout(Stdio::inherit());233		run_nix_inner(str, cmd, &mut NixHandler::default()).await234	}235}236237struct EmptyAsyncRead;238impl AsyncRead for EmptyAsyncRead {239	fn poll_read(240		self: std::pin::Pin<&mut Self>,241		_cx: &mut std::task::Context<'_>,242		_buf: &mut tokio::io::ReadBuf<'_>,243	) -> Poll<std::io::Result<()>> {244		Poll::Pending245	}246}247248async fn run_nix_inner_stdout(249	str: String,250	cmd: Command,251	handler: &mut dyn Handler,252) -> Result<Vec<u8>> {253	Ok(run_nix_inner_raw(str, cmd, true, handler, None)254		.await?255		.expect("has out"))256}257async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {258	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;259	assert!(v.is_none());260	Ok(())261}262async fn run_nix_inner_stdout_ssh(263	str: String,264	cmd: OwningCommand<Arc<Session>>,265	handler: &mut dyn Handler,266) -> Result<Vec<u8>> {267	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)268		.await?269		.expect("has out"))270}271async fn run_nix_inner_ssh(272	str: String,273	cmd: OwningCommand<Arc<Session>>,274	handler: &mut dyn Handler,275) -> Result<()> {276	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;277	assert!(v.is_none());278	Ok(())279}280281pub trait Handler: Send {282	fn handle_line(&mut self, e: &str);283}284285pub struct ClonableHandler<H>(Arc<Mutex<H>>);286impl<H> Clone for ClonableHandler<H> {287	fn clone(&self) -> Self {288		Self(self.0.clone())289	}290}291impl<H> ClonableHandler<H> {292	pub fn new(inner: H) -> Self {293		Self(Arc::new(Mutex::new(inner)))294	}295}296impl<H: Handler> Handler for ClonableHandler<H> {297	fn handle_line(&mut self, e: &str) {298		self.0.lock().unwrap().handle_line(e)299	}300}301302struct PlainHandler;303impl Handler for PlainHandler {304	fn handle_line(&mut self, e: &str) {305		info!(target: "log", "{e}");306	}307}308309pub struct NoopHandler;310impl Handler for NoopHandler {311	fn handle_line(&mut self, _e: &str) {}312}313314#[derive(Default)]315pub struct NixHandler {316	spans: HashMap<u64, Span>,317}318fn process_message(m: &str) -> String {319	static OSC_CLEANER: Lazy<Regex> =320		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());321	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());322	let m = OSC_CLEANER.replace_all(m, "");323	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,324	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.325	DETABBER.replace_all(m.as_ref(), "  ").to_string()326}327impl Handler for NixHandler {328	fn handle_line(&mut self, e: &str) {329		if let Some(e) = e.strip_prefix("@nix ") {330			let log: NixLog = match serde_json::from_str(e) {331				Ok(l) => l,332				Err(err) => {333					warn!("failed to parse nix log line {:?}: {}", e, err);334					return;335				}336			};337			match log {338				NixLog::Msg { msg, raw_msg, .. } => {339					#[allow(clippy::nonminimal_bool)]340					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))341					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")342					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {343						if let Some(raw_msg) = raw_msg {344							if !msg.is_empty() {345								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())346							} else {347								info!(target: "nix", "{}", raw_msg.trim_end())348							}349						} else {350							info!(target: "nix", "{}", msg.trim_end())351						}352					}353				}354				NixLog::Start {355					ref fields,356					typ,357					id,358					..359				} if typ == 105 && !fields.is_empty() => {360					if let [LogField::String(drv), ..] = &fields[..] {361						let mut drv = drv.as_str();362						if let Some(pkg) = drv.strip_prefix("/nix/store/") {363							let mut it = pkg.splitn(2, '-');364							it.next();365							if let Some(pkg) = it.next() {366								drv = pkg;367							}368						}369						info!(target: "nix","building {}", drv);370						let span = info_span!("build", drv);371						span.pb_start();372						self.spans.insert(id, span);373					} else {374						warn!("bad build log: {:?}", log)375					}376				}377				NixLog::Start {378					ref fields,379					typ,380					id,381					..382				} if typ == 100 && fields.len() >= 3 => {383					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =384						&fields[..]385					{386						let mut drv = drv.as_str();387388						if let Some(pkg) = drv.strip_prefix("/nix/store/") {389							let mut it = pkg.splitn(2, '-');390							it.next();391							if let Some(pkg) = it.next() {392								drv = pkg;393							}394						}395						// info!(target: "nix","copying {} {} -> {}", drv, from, to);396						let span = info_span!("copy", from, to, drv);397						span.pb_start();398						self.spans.insert(id, span);399					} else {400						warn!("bad copy log: {:?}", log)401					}402				}403				NixLog::Start { text, typ, id, .. }404					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>405				{406					if !text.is_empty()407						&& text != "querying info about missing paths"408						&& text != "copying 0 paths"409						// Too much spam on lazy-trees branch410						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))411					{412						let span = info_span!("job");413						span.pb_start();414						span.pb_set_message(&process_message(text.trim()));415						self.spans.insert(id, span);416						info!(target: "nix", "{}", text);417					}418				}419				NixLog::Start {420					text,421					level: 0,422					typ: 108,423					..424				} if text.is_empty() => {425					// Cache lookup? Coupled with copy log426				}427				NixLog::Start {428					text,429					level: 4,430					typ: 109,431					..432				} if text.starts_with("querying info about ") => {433					// Cache lookup434				}435				NixLog::Start {436					text,437					level: 4,438					typ: 101,439					..440				} if text.starts_with("downloading ") => {441					// NAR downloading, coupled with copy log442				}443				NixLog::Start {444					text,445					level: 1,446					typ: 111,447					..448				} if text.starts_with("waiting for a machine to build ") => {449					// Useless repeating notification about build450				}451				NixLog::Start {452					text,453					level: 3,454					typ: 111,455					..456				} if text.starts_with("resolved derivation: ") => {457					// CA resolved458				}459				NixLog::Start {460					text,461					level: 1,462					typ: 111,463					id,464					..465				} if text.starts_with("waiting for lock on ") => {466					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();467					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {468						drv = txt;469					}470					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {471						drv = txt;472					}473					if let Some(txt) = drv.split("', '").next() {474						drv = txt;475					}476					if let Some(pkg) = drv.strip_prefix("/nix/store/") {477						let mut it = pkg.splitn(2, '-');478						it.next();479						if let Some(pkg) = it.next() {480							drv = pkg;481						}482					}483					let span = info_span!("waiting on drv", drv);484					span.pb_start();485					self.spans.insert(id, span);486					// Concurrent build of the same message487				}488				NixLog::Stop { id, .. } => {489					self.spans.remove(&id);490				}491				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {492					if let Some(span) = self.spans.get(&id) {493						if let LogField::String(s) = &fields[0] {494							span.pb_set_message(&process_message(s.trim()));495						} else {496							warn!("bad fields: {fields:?}");497						}498					} else {499						warn!("unknown result id: {id} {typ} {fields:?}");500					}501					// dbg!(fields, id, typ);502				}503				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {504					if let Some(span) = self.spans.get(&id) {505						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =506							&fields[..4]507						{508							span.pb_set_length(*expected);509							span.pb_set_position(*done);510						} else {511							warn!("bad fields: {fields:?}");512						}513					} else {514						// warn!("unknown result id: {id} {typ} {fields:?}");515						// Unaccounted progress.516					}517					// dbg!(fields, id, typ);518				}519				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {520					// Set phase, expected521				}522				_ => warn!("unknown log: {:?}", log),523			};524		} else {525			let e = e.trim();526			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {527				return;528			}529			info!("{e}")530		}531	}532}533534async fn run_nix_inner_raw(535	str: String,536	mut cmd: Command,537	want_stdout: bool,538	err_handler: &mut dyn Handler,539	mut out_handler: Option<&mut dyn Handler>,540) -> Result<Option<Vec<u8>>> {541	cmd.stderr(Stdio::piped());542	cmd.stdout(Stdio::piped());543	let mut child = cmd.spawn()?;544	let mut stderr = child.stderr.take().unwrap();545	let stdout = child.stdout.take().unwrap();546	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());547	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));548	let mut ob = want_stdout549		.then(|| out.take().unwrap())550		.unwrap_or_else(|| Box::new(EmptyAsyncRead));551	let mut ol = (!want_stdout)552		.then(|| out.take().unwrap())553		.unwrap_or_else(|| Box::new(EmptyAsyncRead));554	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());555	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());556557	// while let Some(line) = read.next().await? {}558559	let mut out_buf = if want_stdout { Some(vec![]) } else { None };560	loop {561		select! {562			e = err.next() => {563				if let Some(e) = e {564					let e = e?;565					err_handler.handle_line(&e);566				}567			},568			o = ob.next() => {569				if let Some(o) = o {570					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);571				}572			},573			o = ol.next() => {574				if let Some(o) = o {575					let o = o?;576					if let Some(out) = out_handler.as_mut() {577						out.handle_line(&o)578					} else {579						err_handler.handle_line(&o)580					}581					// out_handler.handle_info(&o);582				}583			},584			code = child.wait() => {585				let code = code?;586				if !code.success() {587					anyhow::bail!("command '{str}' failed with status {}", code);588				}589				break;590			}591		}592	}593594	Ok(out_buf)595}596async fn run_nix_inner_raw_ssh(597	str: String,598	mut cmd: OwningCommand<Arc<Session>>,599	want_stdout: bool,600	err_handler: &mut dyn Handler,601	mut out_handler: Option<&mut dyn Handler>,602) -> Result<Option<Vec<u8>>> {603	cmd.stderr(openssh::Stdio::piped());604	cmd.stdout(openssh::Stdio::piped());605	let mut child = cmd.spawn().await?;606	let mut stderr = child.stderr().take().unwrap();607	let stdout = child.stdout().take().unwrap();608	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());609	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));610	let mut ob = want_stdout611		.then(|| out.take().unwrap())612		.unwrap_or_else(|| Box::new(EmptyAsyncRead));613	let mut ol = (!want_stdout)614		.then(|| out.take().unwrap())615		.unwrap_or_else(|| Box::new(EmptyAsyncRead));616	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());617	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());618619	// while let Some(line) = read.next().await? {}620621	let mut out_buf = if want_stdout { Some(vec![]) } else { None };622623	let mut wait_future = pin::pin!(child.wait());624	loop {625		select! {626			e = err.next() => {627				if let Some(e) = e {628					let e = e?;629					err_handler.handle_line(&e);630				}631			},632			o = ob.next() => {633				if let Some(o) = o {634					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);635				}636			},637			o = ol.next() => {638				if let Some(o) = o {639					let o = o?;640					if let Some(out) = out_handler.as_mut() {641						out.handle_line(&o)642					} else {643						err_handler.handle_line(&o)644					}645					// out_handler.handle_info(&o);646				}647			},648			code = &mut wait_future => {649				let code = code?;650				if !code.success() {651					anyhow::bail!("command '{str}' failed with status {}", code);652				}653				break;654			}655		}656	}657658	Ok(out_buf)659}660661pub trait ErrorRecorder: Send {662	/// Return true to discard message from logging663	fn push_message(&mut self, msg: &str) -> bool;664}665666#[derive(Debug)]667enum LogField {668	String(String),669	Num(u64),670}671672impl<'de> Deserialize<'de> for LogField {673	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>674	where675		D: serde::Deserializer<'de>,676	{677		struct StringOrNum;678		impl<'de> Visitor<'de> for StringOrNum {679			type Value = LogField;680681			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {682				write!(f, "string or unsigned")683			}684685			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>686			where687				E: serde::de::Error,688			{689				Ok(LogField::String(v.to_owned()))690			}691692			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>693			where694				E: serde::de::Error,695			{696				Ok(LogField::Num(v))697			}698		}699700		deserializer.deserialize_any(StringOrNum)701	}702}703704#[derive(Deserialize, Debug)]705#[serde(rename_all = "camelCase", tag = "action")]706#[allow(dead_code)]707enum NixLog {708	Msg {709		level: u32,710		msg: String,711		raw_msg: Option<String>,712	},713	Start {714		id: u64,715		level: u32,716		#[serde(default)]717		fields: Vec<LogField>,718		text: String,719		#[serde(rename = "type")]720		typ: u32,721	},722	Stop {723		id: u64,724	},725	Result {726		id: u64,727		#[serde(rename = "type")]728		typ: u32,729		#[serde(default)]730		fields: Vec<LogField>,731	},732}
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,10 +1,10 @@
 use std::{
 	env::current_dir,
-	ffi::OsString,
+	ffi::{OsStr, OsString},
 	io::Write,
 	ops::Deref,
 	path::PathBuf,
-	sync::{Arc, Mutex, MutexGuard},
+	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
 use anyhow::{anyhow, bail, Context, Result};
@@ -46,16 +46,55 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	async fn open_session(&self) -> Result<openssh::Session> {
-		let mut session = SessionBuilder::default();
+	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		// FIXME: TOCTOU
+		if let Some(session) = &self.session.get() {
+			return Ok((*session).clone());
+		};
+		let session = SessionBuilder::default();
 
-		session
+		let session = session
 			.connect(&self.name)
 			.await
-			.map_err(|e| anyhow!("ssh error: {e}"))
+			.map_err(|e| anyhow!("ssh error: {e}"))?;
+		let session = Arc::new(session);
+		self.session.set(session.clone()).expect("TOCTOU happened");
+		Ok(session)
+	}
+	pub async fn mktemp_dir(&self) -> Result<String> {
+		let mut cmd = self.cmd("mktemp").await?;
+		cmd.arg("-d");
+		let path = cmd.run_string().await?;
+		Ok(path.trim_end().to_owned())
 	}
+	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_bytes().await
+	}
+	pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_string().await
+	}
+	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
+		let session = self.open_session().await?;
+		Ok(MyCommand::new_on(cmd, session))
+	}
+
+	pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 }
 
 impl Config {
@@ -96,12 +135,21 @@
 		command.run_string().await
 	}
 
+	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
+		Ok(ConfigHost {
+			name: name.to_owned(),
+			session: OnceLock::new(),
+		})
+	}
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
 		let fleet_field = &self.fleet_field;
 		let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
 		let mut out = vec![];
 		for name in names {
-			out.push(ConfigHost { name })
+			out.push(ConfigHost {
+				name,
+				session: OnceLock::new(),
+			})
 		}
 		Ok(out)
 	}
@@ -152,19 +200,6 @@
 		host_secrets.insert(secret, value);
 	}
 
-	pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>> {
-		let data = z85::encode(&data);
-		let mut cmd = MyCommand::new("fleet-install-secrets");
-		cmd.arg("decrypt").eqarg("--secret", data);
-		cmd = cmd.sudo().ssh(host);
-		let encoded = cmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		z85::decode(encoded).context("bad encoded data? outdated host?")
-	}
 	pub async fn reencrypt_on_host(
 		&self,
 		host: &str,
modifiednixos/meta.nixdiffbeforeafterboth
--- a/nixos/meta.nix
+++ b/nixos/meta.nix
@@ -1,11 +1,18 @@
-{ lib, ... }:
-with lib;
 {
+  lib,
+  pkgs,
+  ...
+}:
+with lib; {
   options = with types; {
+    nixpkgs.resolvedPkgs = mkOption {
+      type = types.pkgs // {description = "nixpkgs.pkgs";};
+      description = "Value of pkgs";
+    };
     tags = mkOption {
       type = listOf str;
       description = "Host tags";
-      default = [ ];
+      default = [];
     };
     network = mkOption {
       type = submodule {
@@ -13,12 +20,12 @@
           internalIps = mkOption {
             type = listOf str;
             description = "Internal ips";
-            default = [ ];
+            default = [];
           };
           externalIps = mkOption {
             type = listOf str;
             description = "External ips";
-            default = [ ];
+            default = [];
           };
         };
       };
@@ -29,7 +36,8 @@
     };
   };
   config = {
-    tags = [ "all" ];
-    network = { };
+    tags = ["all"];
+    network = {};
+    nixpkgs.resolvedPkgs = pkgs;
   };
 }