git.delta.rocks / jrsonnet / refs/commits / a369041a95eb

difftreelog

refactor shell abstraction

Yaroslav Bolyukin2023-12-28parent: #7e2e5c5.patch.diff
in: trunk

6 files changed

modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -472,7 +472,7 @@
 	($field:ident $($tt:tt)*) => {{
 		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};
 		#[allow(unused_mut, reason = "might be used if indexed")]
-		let mut out = NixExprBuilder::field($field);
+		let mut out = NixExprBuilder::field($field.clone());
 		nix_expr_inner!(@field(out) $($tt)*);
 		out
 	}};
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
before · cmds/fleet/src/cmds/build_systems.rs
1use std::os::unix::fs::symlink;2use std::path::PathBuf;3use std::{env::current_dir, time::Duration};45use crate::command::MyCommand;6use crate::host::Config;7use crate::nix_go;8use anyhow::{anyhow, Result};9use clap::Parser;10use itertools::Itertools;11use tokio::{task::LocalSet, time::sleep};12use tracing::{error, field, info, info_span, warn, Instrument};1314#[derive(Parser, Clone)]15pub struct BuildSystems {16	/// Disable automatic rollback17	#[clap(long)]18	disable_rollback: bool,19	#[clap(subcommand)]20	subcommand: Subcommand,21}2223enum UploadAction {24	Test,25	Boot,26	Switch,27}28impl UploadAction {29	fn name(&self) -> &'static str {30		match self {31			UploadAction::Test => "test",32			UploadAction::Boot => "boot",33			UploadAction::Switch => "switch",34		}35	}3637	pub(crate) fn should_switch_profile(&self) -> bool {38		matches!(self, Self::Switch | Self::Boot)39	}40	pub(crate) fn should_activate(&self) -> bool {41		matches!(self, Self::Switch | Self::Test)42	}43	pub(crate) fn should_schedule_rollback_run(&self) -> bool {44		matches!(self, Self::Switch | Self::Test)45	}46}4748enum PackageAction {49	SdImage,50	InstallationCd,51}52impl PackageAction {53	fn build_attr(&self) -> String {54		match self {55			PackageAction::SdImage => "sdImage".to_owned(),56			PackageAction::InstallationCd => "installationCd".to_owned(),57		}58	}59}6061enum Action {62	Upload { action: Option<UploadAction> },63	Package(PackageAction),64}65impl Action {66	fn build_attr(&self) -> String {67		match self {68			Action::Upload { .. } => "toplevel".to_owned(),69			Action::Package(p) => p.build_attr(),70		}71	}72}7374impl From<Subcommand> for Action {75	fn from(s: Subcommand) -> Self {76		match s {77			Subcommand::Upload => Self::Upload { action: None },78			Subcommand::Test => Self::Upload {79				action: Some(UploadAction::Test),80			},81			Subcommand::Boot => Self::Upload {82				action: Some(UploadAction::Boot),83			},84			Subcommand::Switch => Self::Upload {85				action: Some(UploadAction::Switch),86			},87			Subcommand::SdImage => Self::Package(PackageAction::SdImage),88			Subcommand::InstallationCd => Self::Package(PackageAction::InstallationCd),89		}90	}91}9293#[derive(Parser, Clone)]94enum Subcommand {95	/// Upload, but do not switch96	Upload,97	/// Upload + switch to built system until reboot98	Test,99	/// Upload + switch to built system after reboot100	Boot,101	/// Upload + test + boot102	Switch,103104	/// Build SD .img image105	SdImage,106	/// Build an installation cd ISO image107	InstallationCd,108}109110struct Generation {111	id: u32,112	current: bool,113	datetime: String,114}115async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {116	let mut cmd = MyCommand::new("nix-env");117	cmd.comparg("--profile", "/nix/var/nix/profiles/system")118		.arg("--list-generations");119	// Sudo is required due to --list-generations acquiring lock on the profile.120	let data = config.run_string_on(host, cmd, true).await?;121	let generations = data122		.split('\n')123		.map(|e| e.trim())124		.filter(|&l| !l.is_empty())125		.filter_map(|g| {126			let gen: Option<Generation> = try {127				let mut parts = g.split_whitespace();128				let id = parts.next()?;129				let id: u32 = id.parse().ok()?;130				let date = parts.next()?;131				let time = parts.next()?;132				let current = if let Some(current) = parts.next() {133					if current == "(current)" {134						Some(true)135					} else {136						None137					}138				} else {139					Some(false)140				};141				let current = current?;142				if parts.next().is_some() {143					warn!("unexpected text after generation: {g}");144				}145				Generation {146					id,147					current,148					datetime: format!("{date} {time}"),149				}150			};151			if gen.is_none() {152				warn!("bad generation: {g}")153			}154			gen155		})156		.collect::<Vec<_>>();157	let current = generations158		.into_iter()159		.filter(|g| g.current)160		.at_most_one()161		.map_err(|_e| anyhow!("bad list-generations output"))?162		.ok_or_else(|| anyhow!("failed to find generation"))?;163	Ok(current)164}165166async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {167	let mut cmd = MyCommand::new("systemctl");168	cmd.arg("stop").arg(unit);169	config.run_on(host, cmd, true).await170}171172async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {173	let mut cmd = MyCommand::new("systemctl");174	cmd.arg("start").arg(unit);175	config.run_on(host, cmd, true).await176}177178async fn execute_upload(179	build: &BuildSystems,180	config: &Config,181	action: UploadAction,182	host: &str,183	built: PathBuf,184) -> Result<()> {185	let mut failed = false;186	// TODO: Lockfile, to prevent concurrent system switch?187	// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback188	// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to189	// unit name conflict in systemd-run190	// This code is tied to rollback.nix191	if !build.disable_rollback {192		let _span = info_span!("preparing").entered();193		info!("preparing for rollback");194		let generation = get_current_generation(config, host).await?;195		info!(196			"rollback target would be {} {}",197			generation.id, generation.datetime198		);199		{200			let mut cmd = MyCommand::new("sh");201			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));202			if let Err(e) = config.run_on(host, cmd, true).await {203				error!("failed to set rollback marker: {e}");204				failed = true;205			}206		}207		// Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.208		// Kicking it on manually will work best.209		//210		// There wouldn't be conflict, because here we trigger start of the primary service, and systemd will211		// only allow one instance of it.212213		// TODO: We should also watch how this process is going.214		// After running this command, we have less than 3 minutes to deploy everything,215		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.216		// Anyway, reboot will still help in this case.217		if action.should_schedule_rollback_run() {218			let mut cmd = MyCommand::new("systemd-run");219			cmd.comparg("--on-active", "3min")220				.comparg("--unit", "rollback-watchdog-run")221				.arg("systemctl")222				.arg("start")223				.arg("rollback-watchdog.service");224			if let Err(e) = config.run_on(host, cmd, true).await {225				error!("failed to schedule rollback run: {e}");226				failed = true;227			}228		}229	}230	if action.should_switch_profile() && !failed {231		info!("switching generation");232		let mut cmd = MyCommand::new("nix-env");233		cmd.comparg("--profile", "/nix/var/nix/profiles/system")234			.comparg("--set", &built);235		if let Err(e) = config.run_on(host, cmd, true).await {236			error!("failed to switch generation: {e}");237			failed = true;238		}239	}240	if action.should_activate() && !failed {241		let _span = info_span!("activating").entered();242		info!("executing activation script");243		let mut switch_script = built.clone();244		switch_script.push("bin");245		switch_script.push("switch-to-configuration");246		let mut cmd = MyCommand::new(switch_script);247		cmd.arg(action.name());248		if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {249			error!("failed to activate: {e}");250			failed = true;251		}252	}253	if !build.disable_rollback {254		if failed {255			info!("executing rollback");256			if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")257				.instrument(info_span!("rollback"))258				.await259			{260				error!("failed to trigger rollback: {e}")261			}262		} else {263			info!("trying to mark upgrade as successful");264			let mut cmd = MyCommand::new("rm");265			cmd.arg("-f").arg("/etc/fleet_rollback_marker");266			if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {267				error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")268			}269		}270		info!("disarming watchdog, just in case");271		if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {272			// It is ok, if there was no reboot - then timer might not be running.273		}274		if action.should_schedule_rollback_run() {275			if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {276				error!("failed to disarm rollback run: {e}");277			}278		}279	} else {280		let mut cmd = MyCommand::new("rm");281		cmd.arg("-f").arg("/etc/fleet_rollback_marker");282		if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {283			// Marker might not exist, yet better try to remove it.284		}285	}286	Ok(())287}288289impl BuildSystems {290	async fn build_task(self, config: Config, host: String) -> Result<()> {291		info!("building");292		let action = Action::from(self.subcommand.clone());293		let fleet_field = &config.fleet_field;294		let drv = nix_go!(fleet_field.buildSystems(Obj {295			localSystem: { config.local_system.clone() }296		}));297		let outputs = drv.build().await.map_err(|e| {298			if action.build_attr() == "sdImage" {299				info!("sd-image build failed");300				info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");301			}302			e303		})?;304		let out_output = outputs305			.get("out")306			.ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;307308		match action {309			Action::Upload { action } => {310				if !config.is_local(&host) {311					info!("uploading system closure");312					{313						// Alternatively, nix store make-content-addressed can be used,314						// at least for the first deployment, to provide trusted store key.315						//316						// It is much slower, yet doesn't require root on the deployer machine.317						let mut sign = MyCommand::new("nix");318						// Private key for host machine is registered in nix-sign.nix319						sign.arg("store")320							.arg("sign")321							.comparg("--key-file", "/etc/nix/private-key")322							.arg("-r")323							.arg(out_output);324						if let Err(e) = sign.sudo().run_nix().await {325							warn!("Failed to sign store paths: {e}");326						};327					}328					let mut tries = 0;329					loop {330						let mut nix = MyCommand::new("nix");331						nix.arg("copy")332							.arg("--substitute-on-destination")333							.comparg("--to", format!("ssh-ng://{host}"))334							.arg(out_output);335						match nix.run_nix().await {336							Ok(()) => break,337							Err(e) if tries < 3 => {338								tries += 1;339								warn!("Copy failure ({}/3): {}", tries, e);340								sleep(Duration::from_millis(5000)).await;341							}342							Err(e) => return Err(e),343						}344					}345				}346				if let Some(action) = action {347					execute_upload(&self, &config, action, &host, out_output.clone()).await?348				}349			}350			Action::Package(PackageAction::SdImage) => {351				let mut out = current_dir()?;352				out.push(format!("sd-image-{}", host));353354				info!("linking sd image to {:?}", out);355				symlink(out_output, out)?;356			}357			Action::Package(PackageAction::InstallationCd) => {358				let mut out = current_dir()?;359				out.push(format!("installation-cd-{}", host));360361				info!("linking iso image to {:?}", out);362				symlink(out_output, out)?;363			}364		};365		Ok(())366	}367368	pub async fn run(self, config: &Config) -> Result<()> {369		let hosts = config.list_hosts().await?;370		let set = LocalSet::new();371		let this = &self;372		for host in hosts.into_iter() {373			if config.should_skip(&host.name) {374				continue;375			}376			let config = config.clone();377			let this = this.clone();378			let span = info_span!("deployment", host = field::display(&host.name));379			let hostname = host.name;380			set.spawn_local(381				(async move {382					match this.build_task(config, hostname).await {383						Ok(_) => {}384						Err(e) => {385							error!("failed to deploy host: {}", e)386						}387					}388				})389				.instrument(span),390			);391		}392		set.await;393		Ok(())394	}395}
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,4 +1,5 @@
 use crate::{
+	command::MyCommand,
 	fleetdata::{FleetSecret, FleetSharedSecret},
 	host::Config,
 	nix_go, nix_go_json,
@@ -12,6 +13,7 @@
 	collections::HashSet,
 	io::{self, Cursor, Read},
 	path::PathBuf,
+	sync::Arc,
 };
 use tabled::{Table, Tabled};
 use tokio::fs::read_to_string;
@@ -97,8 +99,9 @@
 			Secret::InvokeGenerator => {
 				let config_field = &config.config_unchecked_field;
 
-				let generate_impure =
-					nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);
+				let secret =
+					nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);
+				let generate_impure = nix_go!(secret.generateImpure);
 				let on = nix_go!(generate_impure.on);
 				let call_package = nix_go!(
 					config_field.buildableSystems(Obj {
@@ -106,13 +109,62 @@
 					})[on]
 						.config
 						.nixpkgs
-						.pkgs
+						.resolvedPkgs
 						.callPackage
 				);
-				let generator = nix_go!(call_package(generate_impure.generator));
-				let built = generator.build().await?;
-				// .as_json().await?;
-				dbg!(&built);
+				let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));
+				let built = &generator.build().await?["out"];
+				let mut nix = MyCommand::new("nix");
+				let on: String = on.as_json().await?;
+				nix.arg("copy")
+					.arg("--substitute-on-destination")
+					.comparg("--to", format!("ssh-ng://{on}"))
+					.arg(built);
+				nix.run_nix().await?;
+
+				let session = config.host(&on).await?;
+
+				let owners: Vec<String> = nix_go_json!(secret.expectedOwners);
+				dbg!(&owners);
+
+				let mut recipients = String::new();
+				for owner in owners {
+					let key = config.key(&owner).await?;
+					recipients.push_str(&format!("-r \"{key}\" "));
+				}
+				recipients.push_str("-e");
+
+				// FIXME: security: created directory might be accessible to other users
+				// This shouldn't be much of a concern, as data is encrypted right after creation, yet
+				// still better to have.
+				let tempdir = session.mktemp_dir().await?;
+
+				let mut gen = session.cmd(built).await?;
+				gen.env("rageArgs", recipients).env("out", &tempdir);
+				gen.run().await?;
+
+				{
+					let marker = session.read_file_text(format!("{tempdir}/marker")).await?;
+					ensure!(marker == "SUCCESS", "generation not succeeded");
+				}
+
+				let public = session
+					.read_file_bin(format!("{tempdir}/public"))
+					.await
+					.ok();
+				let secret = session
+					.read_file_bin(format!("{tempdir}/secret"))
+					.await
+					.ok();
+				if let Some(secret) = &secret {
+					ensure!(
+						age::Decryptor::new(Cursor::new(&secret)).is_ok(),
+						"builder produced non-encrypted value as secret, this is highly insecure"
+					);
+				}
+				dbg!(&secret);
+				// // .as_json().await?;
+				// dbg!(&built);
 			}
 			Secret::ForceKeys => {
 				for host in config.list_hosts().await? {
@@ -249,7 +301,8 @@
 				if secret.secret.is_empty() {
 					bail!("no secret {name}");
 				}
-				let data = config.decrypt_on_host(&machine, secret.secret).await?;
+				let host = config.host(&machine).await?;
+				let data = host.decrypt(secret.secret).await?;
 				if plaintext {
 					let s = String::from_utf8(data).context("output is not utf8")?;
 					print!("{s}");
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,6 +1,7 @@
 use std::{
 	collections::HashMap,
 	ffi::OsStr,
+	pin,
 	process::Stdio,
 	sync::{Arc, Mutex},
 	task::Poll,
@@ -10,7 +11,7 @@
 use futures::StreamExt;
 use itertools::Either;
 use once_cell::sync::Lazy;
-use openssh::{OverSsh, Session};
+use openssh::{OverSsh, OwningCommand, Session};
 use regex::Regex;
 use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
@@ -44,6 +45,15 @@
 	ssh_session: Option<Arc<Session>>,
 }
 impl MyCommand {
+	pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {
+		assert!(!cmd.as_ref().is_empty());
+		Self {
+			command: ostoutf8(cmd),
+			args: vec![],
+			env: vec![],
+			ssh_session: Some(session),
+		}
+	}
 	pub fn new(cmd: impl AsRef<OsStr>) -> Self {
 		assert!(!cmd.as_ref().is_empty());
 		Self {
@@ -66,6 +76,29 @@
 		out.extend(self.args);
 		out
 	}
+
+	/// Translates environment variables into env command execution.
+	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).
+	///
+	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.
+	/// Figure out some way to transfer environment using stdio?
+	fn translate_env_into_env(self) -> Self {
+		if self.env.is_empty() {
+			return self;
+		}
+		let mut out = Self::new("env");
+		if let Some(session) = self.ssh_session {
+			out = out.ssh_session(session);
+		}
+		for (k, v) in self.env {
+			assert!(!k.contains('='));
+			out.arg(format!("{k}={v}"));
+		}
+		out.arg(self.command);
+		out.args(self.args);
+
+		out
+	}
 	fn into_string(self) -> String {
 		let mut out = String::new();
 		if !self.env.is_empty() {
@@ -98,7 +131,7 @@
 	}
 	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {
 		Ok(if let Some(session) = self.ssh_session.clone() {
-			let cmd = self.into_command();
+			let cmd = self.translate_env_into_env().into_command();
 			Either::Right(
 				cmd.over_ssh(session)
 					.map_err(|e| anyhow!("ssh error: {e}"))?,
@@ -126,6 +159,11 @@
 		self.arg(value);
 		self
 	}
+	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+		self.env
+			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));
+		self
+	}
 	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
 		for arg in args.into_iter() {
 			let arg = arg.as_ref();
@@ -133,9 +171,10 @@
 		}
 		self
 	}
-	pub fn sudo(self) -> Self {
+	pub fn sudo(mut self) -> Self {
 		if std::env::var_os("NO_SUDO").is_some() {
 			let mut out = Self::new("su");
+			out.ssh_session = self.ssh_session.take();
 			out.arg("-c").arg(self.into_string());
 			out
 		} else {
@@ -144,27 +183,38 @@
 			out
 		}
 	}
-	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {
+	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
+		self.ssh_session = Some(on);
+		self
+	}
+	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
 		let mut out = Self::new("ssh");
+		out.ssh_session = self.ssh_session.take();
 		out.arg(on).arg("--");
 		out.arg(self.into_string());
 		out
 	}
-	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {
-		self.ssh_session = Some(session);
-		self
-	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
-		let cmd = self.into_command();
-		run_nix_inner(str, cmd, &mut PlainHandler).await?;
+		let cmd = self.into_command_new()?;
+		match cmd {
+			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,
+			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,
+		};
 		Ok(())
 	}
 	pub async fn run_string(self) -> Result<String> {
+		let bytes = self.run_bytes().await?;
+		Ok(String::from_utf8(bytes)?)
+	}
+	pub async fn run_bytes(self) -> Result<Vec<u8>> {
 		let str = self.clone().into_string();
-		let cmd = self.into_command();
-		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;
+		let cmd = self.into_command_new()?;
+		let v = match cmd {
+			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,
+			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,
+		};
 		Ok(v)
 	}
 
@@ -172,7 +222,8 @@
 		let str = self.clone().into_string();
 		let mut cmd = self.into_command();
 		cmd.arg("--log-format").arg("internal-json");
-		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await
+		let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;
+		Ok(String::from_utf8(bytes)?)
 	}
 	pub async fn run_nix(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -198,7 +249,7 @@
 	str: String,
 	cmd: Command,
 	handler: &mut dyn Handler,
-) -> Result<String> {
+) -> Result<Vec<u8>> {
 	Ok(run_nix_inner_raw(str, cmd, true, handler, None)
 		.await?
 		.expect("has out"))
@@ -208,6 +259,24 @@
 	assert!(v.is_none());
 	Ok(())
 }
+async fn run_nix_inner_stdout_ssh(
+	str: String,
+	cmd: OwningCommand<Arc<Session>>,
+	handler: &mut dyn Handler,
+) -> Result<Vec<u8>> {
+	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)
+		.await?
+		.expect("has out"))
+}
+async fn run_nix_inner_ssh(
+	str: String,
+	cmd: OwningCommand<Arc<Session>>,
+	handler: &mut dyn Handler,
+) -> Result<()> {
+	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
+	assert!(v.is_none());
+	Ok(())
+}
 
 pub trait Handler: Send {
 	fn handle_line(&mut self, e: &str);
@@ -468,7 +537,7 @@
 	want_stdout: bool,
 	err_handler: &mut dyn Handler,
 	mut out_handler: Option<&mut dyn Handler>,
-) -> Result<Option<String>> {
+) -> Result<Option<Vec<u8>>> {
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
 	let mut child = cmd.spawn()?;
@@ -522,7 +591,71 @@
 		}
 	}
 
-	Ok(out_buf.map(String::from_utf8).transpose()?)
+	Ok(out_buf)
+}
+async fn run_nix_inner_raw_ssh(
+	str: String,
+	mut cmd: OwningCommand<Arc<Session>>,
+	want_stdout: bool,
+	err_handler: &mut dyn Handler,
+	mut out_handler: Option<&mut dyn Handler>,
+) -> Result<Option<Vec<u8>>> {
+	cmd.stderr(openssh::Stdio::piped());
+	cmd.stdout(openssh::Stdio::piped());
+	let mut child = cmd.spawn().await?;
+	let mut stderr = child.stderr().take().unwrap();
+	let stdout = child.stdout().take().unwrap();
+	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
+	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
+	let mut ob = want_stdout
+		.then(|| out.take().unwrap())
+		.unwrap_or_else(|| Box::new(EmptyAsyncRead));
+	let mut ol = (!want_stdout)
+		.then(|| out.take().unwrap())
+		.unwrap_or_else(|| Box::new(EmptyAsyncRead));
+	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
+	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
+
+	// while let Some(line) = read.next().await? {}
+
+	let mut out_buf = if want_stdout { Some(vec![]) } else { None };
+
+	let mut wait_future = pin::pin!(child.wait());
+	loop {
+		select! {
+			e = err.next() => {
+				if let Some(e) = e {
+					let e = e?;
+					err_handler.handle_line(&e);
+				}
+			},
+			o = ob.next() => {
+				if let Some(o) = o {
+					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
+				}
+			},
+			o = ol.next() => {
+				if let Some(o) = o {
+					let o = o?;
+					if let Some(out) = out_handler.as_mut() {
+						out.handle_line(&o)
+					} else {
+						err_handler.handle_line(&o)
+					}
+					// out_handler.handle_info(&o);
+				}
+			},
+			code = &mut wait_future => {
+				let code = code?;
+				if !code.success() {
+					anyhow::bail!("command '{str}' failed with status {}", code);
+				}
+				break;
+			}
+		}
+	}
+
+	Ok(out_buf)
 }
 
 pub trait ErrorRecorder: Send {
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,10 +1,10 @@
 use std::{
 	env::current_dir,
-	ffi::OsString,
+	ffi::{OsStr, OsString},
 	io::Write,
 	ops::Deref,
 	path::PathBuf,
-	sync::{Arc, Mutex, MutexGuard},
+	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
 use anyhow::{anyhow, bail, Context, Result};
@@ -46,16 +46,55 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	async fn open_session(&self) -> Result<openssh::Session> {
-		let mut session = SessionBuilder::default();
+	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		// FIXME: TOCTOU
+		if let Some(session) = &self.session.get() {
+			return Ok((*session).clone());
+		};
+		let session = SessionBuilder::default();
 
-		session
+		let session = session
 			.connect(&self.name)
 			.await
-			.map_err(|e| anyhow!("ssh error: {e}"))
+			.map_err(|e| anyhow!("ssh error: {e}"))?;
+		let session = Arc::new(session);
+		self.session.set(session.clone()).expect("TOCTOU happened");
+		Ok(session)
+	}
+	pub async fn mktemp_dir(&self) -> Result<String> {
+		let mut cmd = self.cmd("mktemp").await?;
+		cmd.arg("-d");
+		let path = cmd.run_string().await?;
+		Ok(path.trim_end().to_owned())
 	}
+	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_bytes().await
+	}
+	pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_string().await
+	}
+	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
+		let session = self.open_session().await?;
+		Ok(MyCommand::new_on(cmd, session))
+	}
+
+	pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 }
 
 impl Config {
@@ -96,12 +135,21 @@
 		command.run_string().await
 	}
 
+	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
+		Ok(ConfigHost {
+			name: name.to_owned(),
+			session: OnceLock::new(),
+		})
+	}
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
 		let fleet_field = &self.fleet_field;
 		let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
 		let mut out = vec![];
 		for name in names {
-			out.push(ConfigHost { name })
+			out.push(ConfigHost {
+				name,
+				session: OnceLock::new(),
+			})
 		}
 		Ok(out)
 	}
@@ -152,19 +200,6 @@
 		host_secrets.insert(secret, value);
 	}
 
-	pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>> {
-		let data = z85::encode(&data);
-		let mut cmd = MyCommand::new("fleet-install-secrets");
-		cmd.arg("decrypt").eqarg("--secret", data);
-		cmd = cmd.sudo().ssh(host);
-		let encoded = cmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		z85::decode(encoded).context("bad encoded data? outdated host?")
-	}
 	pub async fn reencrypt_on_host(
 		&self,
 		host: &str,
modifiednixos/meta.nixdiffbeforeafterboth
--- a/nixos/meta.nix
+++ b/nixos/meta.nix
@@ -1,11 +1,18 @@
-{ lib, ... }:
-with lib;
 {
+  lib,
+  pkgs,
+  ...
+}:
+with lib; {
   options = with types; {
+    nixpkgs.resolvedPkgs = mkOption {
+      type = types.pkgs // {description = "nixpkgs.pkgs";};
+      description = "Value of pkgs";
+    };
     tags = mkOption {
       type = listOf str;
       description = "Host tags";
-      default = [ ];
+      default = [];
     };
     network = mkOption {
       type = submodule {
@@ -13,12 +20,12 @@
           internalIps = mkOption {
             type = listOf str;
             description = "Internal ips";
-            default = [ ];
+            default = [];
           };
           externalIps = mkOption {
             type = listOf str;
             description = "External ips";
-            default = [ ];
+            default = [];
           };
         };
       };
@@ -29,7 +36,8 @@
     };
   };
   config = {
-    tags = [ "all" ];
-    network = { };
+    tags = ["all"];
+    network = {};
+    nixpkgs.resolvedPkgs = pkgs;
   };
 }