git.delta.rocks / jrsonnet / refs/commits / a369041a95eb

difftreelog

refactor shell abstraction

Yaroslav Bolyukin2023-12-28parent: #7e2e5c5.patch.diff
in: trunk

6 files changed

modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -472,7 +472,7 @@
 	($field:ident $($tt:tt)*) => {{
 		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};
 		#[allow(unused_mut, reason = "might be used if indexed")]
-		let mut out = NixExprBuilder::field($field);
+		let mut out = NixExprBuilder::field($field.clone());
 		nix_expr_inner!(@field(out) $($tt)*);
 		out
 	}};
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -291,9 +291,11 @@
 		info!("building");
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
-		let drv = nix_go!(fleet_field.buildSystems(Obj {
-			localSystem: { config.local_system.clone() }
-		}));
+		let drv = nix_go!(
+			fleet_field.buildSystems(Obj {
+				localSystem: { config.local_system.clone() }
+			})[{ action.build_attr() }][{ host }]
+		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
 				info!("sd-image build failed");
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,4 +1,5 @@
 use crate::{
+	command::MyCommand,
 	fleetdata::{FleetSecret, FleetSharedSecret},
 	host::Config,
 	nix_go, nix_go_json,
@@ -12,6 +13,7 @@
 	collections::HashSet,
 	io::{self, Cursor, Read},
 	path::PathBuf,
+	sync::Arc,
 };
 use tabled::{Table, Tabled};
 use tokio::fs::read_to_string;
@@ -97,8 +99,9 @@
 			Secret::InvokeGenerator => {
 				let config_field = &config.config_unchecked_field;
 
-				let generate_impure =
-					nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);
+				let secret =
+					nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);
+				let generate_impure = nix_go!(secret.generateImpure);
 				let on = nix_go!(generate_impure.on);
 				let call_package = nix_go!(
 					config_field.buildableSystems(Obj {
@@ -106,13 +109,62 @@
 					})[on]
 						.config
 						.nixpkgs
-						.pkgs
+						.resolvedPkgs
 						.callPackage
 				);
-				let generator = nix_go!(call_package(generate_impure.generator));
-				let built = generator.build().await?;
-				// .as_json().await?;
-				dbg!(&built);
+				let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));
+				let built = &generator.build().await?["out"];
+				let mut nix = MyCommand::new("nix");
+				let on: String = on.as_json().await?;
+				nix.arg("copy")
+					.arg("--substitute-on-destination")
+					.comparg("--to", format!("ssh-ng://{on}"))
+					.arg(built);
+				nix.run_nix().await?;
+
+				let session = config.host(&on).await?;
+
+				let owners: Vec<String> = nix_go_json!(secret.expectedOwners);
+				dbg!(&owners);
+
+				let mut recipients = String::new();
+				for owner in owners {
+					let key = config.key(&owner).await?;
+					recipients.push_str(&format!("-r \"{key}\" "));
+				}
+				recipients.push_str("-e");
+
+				// FIXME: security: created directory might be accessible to other users
+				// This shouldn't be much of a concern, as data is encrypted right after creation, yet
+				// still better to have.
+				let tempdir = session.mktemp_dir().await?;
+
+				let mut gen = session.cmd(built).await?;
+				gen.env("rageArgs", recipients).env("out", &tempdir);
+				gen.run().await?;
+
+				{
+					let marker = session.read_file_text(format!("{tempdir}/marker")).await?;
+					ensure!(marker == "SUCCESS", "generation not succeeded");
+				}
+
+				let public = session
+					.read_file_bin(format!("{tempdir}/public"))
+					.await
+					.ok();
+				let secret = session
+					.read_file_bin(format!("{tempdir}/secret"))
+					.await
+					.ok();
+				if let Some(secret) = &secret {
+					ensure!(
+						age::Decryptor::new(Cursor::new(&secret)).is_ok(),
+						"builder produced non-encrypted value as secret, this is highly insecure"
+					);
+				}
+				dbg!(&secret);
+				// // .as_json().await?;
+				// dbg!(&built);
 			}
 			Secret::ForceKeys => {
 				for host in config.list_hosts().await? {
@@ -249,7 +301,8 @@
 				if secret.secret.is_empty() {
 					bail!("no secret {name}");
 				}
-				let data = config.decrypt_on_host(&machine, secret.secret).await?;
+				let host = config.host(&machine).await?;
+				let data = host.decrypt(secret.secret).await?;
 				if plaintext {
 					let s = String::from_utf8(data).context("output is not utf8")?;
 					print!("{s}");
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
1use std::{1use std::{
2 collections::HashMap,2 collections::HashMap,
3 ffi::OsStr,3 ffi::OsStr,
4 pin,
4 process::Stdio,5 process::Stdio,
5 sync::{Arc, Mutex},6 sync::{Arc, Mutex},
6 task::Poll,7 task::Poll,
10use futures::StreamExt;11use futures::StreamExt;
11use itertools::Either;12use itertools::Either;
12use once_cell::sync::Lazy;13use once_cell::sync::Lazy;
13use openssh::{OverSsh, Session};14use openssh::{OverSsh, OwningCommand, Session};
14use regex::Regex;15use regex::Regex;
15use serde::{de::Visitor, Deserialize};16use serde::{de::Visitor, Deserialize};
16use tokio::{io::AsyncRead, process::Command, select};17use tokio::{io::AsyncRead, process::Command, select};
44 ssh_session: Option<Arc<Session>>,45 ssh_session: Option<Arc<Session>>,
45}46}
46impl MyCommand {47impl MyCommand {
48 pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {
49 assert!(!cmd.as_ref().is_empty());
50 Self {
51 command: ostoutf8(cmd),
52 args: vec![],
53 env: vec![],
54 ssh_session: Some(session),
55 }
56 }
47 pub fn new(cmd: impl AsRef<OsStr>) -> Self {57 pub fn new(cmd: impl AsRef<OsStr>) -> Self {
48 assert!(!cmd.as_ref().is_empty());58 assert!(!cmd.as_ref().is_empty());
49 Self {59 Self {
67 out77 out
68 }78 }
79
80 /// Translates environment variables into env command execution.
81 /// Required for ssh, as ssh don't allow to send environment variables (at least by default).
82 ///
83 /// FIXME: Insecure, as arguments might be seen by other users on the same machine.
84 /// Figure out some way to transfer environment using stdio?
85 fn translate_env_into_env(self) -> Self {
86 if self.env.is_empty() {
87 return self;
88 }
89 let mut out = Self::new("env");
90 if let Some(session) = self.ssh_session {
91 out = out.ssh_session(session);
92 }
93 for (k, v) in self.env {
94 assert!(!k.contains('='));
95 out.arg(format!("{k}={v}"));
96 }
97 out.arg(self.command);
98 out.args(self.args);
99
100 out
101 }
69 fn into_string(self) -> String {102 fn into_string(self) -> String {
70 let mut out = String::new();103 let mut out = String::new();
71 if !self.env.is_empty() {104 if !self.env.is_empty() {
98 }131 }
99 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {132 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {
100 Ok(if let Some(session) = self.ssh_session.clone() {133 Ok(if let Some(session) = self.ssh_session.clone() {
101 let cmd = self.into_command();134 let cmd = self.translate_env_into_env().into_command();
102 Either::Right(135 Either::Right(
103 cmd.over_ssh(session)136 cmd.over_ssh(session)
104 .map_err(|e| anyhow!("ssh error: {e}"))?,137 .map_err(|e| anyhow!("ssh error: {e}"))?,
126 self.arg(value);159 self.arg(value);
127 self160 self
128 }161 }
162 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
163 self.env
164 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));
165 self
166 }
129 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {167 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
130 for arg in args.into_iter() {168 for arg in args.into_iter() {
131 let arg = arg.as_ref();169 let arg = arg.as_ref();
132 self.args.push(ostoutf8(arg));170 self.args.push(ostoutf8(arg));
133 }171 }
134 self172 self
135 }173 }
136 pub fn sudo(self) -> Self {174 pub fn sudo(mut self) -> Self {
137 if std::env::var_os("NO_SUDO").is_some() {175 if std::env::var_os("NO_SUDO").is_some() {
138 let mut out = Self::new("su");176 let mut out = Self::new("su");
177 out.ssh_session = self.ssh_session.take();
139 out.arg("-c").arg(self.into_string());178 out.arg("-c").arg(self.into_string());
140 out179 out
141 } else {180 } else {
144 out183 out
145 }184 }
146 }185 }
186 pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
187 self.ssh_session = Some(on);
188 self
189 }
147 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {190 pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
148 let mut out = Self::new("ssh");191 let mut out = Self::new("ssh");
192 out.ssh_session = self.ssh_session.take();
149 out.arg(on).arg("--");193 out.arg(on).arg("--");
150 out.arg(self.into_string());194 out.arg(self.into_string());
151 out195 out
152 }196 }
153 pub fn over_ssh(mut self, session: Arc<Session>) -> Self {
154 self.ssh_session = Some(session);
155 self
156 }
157197
158 pub async fn run(self) -> Result<()> {198 pub async fn run(self) -> Result<()> {
159 let str = self.clone().into_string();199 let str = self.clone().into_string();
160 let cmd = self.into_command();200 let cmd = self.into_command_new()?;
201 match cmd {
161 run_nix_inner(str, cmd, &mut PlainHandler).await?;202 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,
203 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,
204 };
162 Ok(())205 Ok(())
163 }206 }
164 pub async fn run_string(self) -> Result<String> {207 pub async fn run_string(self) -> Result<String> {
208 let bytes = self.run_bytes().await?;
209 Ok(String::from_utf8(bytes)?)
210 }
211 pub async fn run_bytes(self) -> Result<Vec<u8>> {
165 let str = self.clone().into_string();212 let str = self.clone().into_string();
166 let cmd = self.into_command();213 let cmd = self.into_command_new()?;
167 let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;214 let v = match cmd {
215 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,
216 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,
217 };
168 Ok(v)218 Ok(v)
169 }219 }
170220
171 pub async fn run_nix_string(self) -> Result<String> {221 pub async fn run_nix_string(self) -> Result<String> {
172 let str = self.clone().into_string();222 let str = self.clone().into_string();
173 let mut cmd = self.into_command();223 let mut cmd = self.into_command();
174 cmd.arg("--log-format").arg("internal-json");224 cmd.arg("--log-format").arg("internal-json");
175 run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await225 let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;
226 Ok(String::from_utf8(bytes)?)
176 }227 }
177 pub async fn run_nix(self) -> Result<()> {228 pub async fn run_nix(self) -> Result<()> {
178 let str = self.clone().into_string();229 let str = self.clone().into_string();
198 str: String,249 str: String,
199 cmd: Command,250 cmd: Command,
200 handler: &mut dyn Handler,251 handler: &mut dyn Handler,
201) -> Result<String> {252) -> Result<Vec<u8>> {
202 Ok(run_nix_inner_raw(str, cmd, true, handler, None)253 Ok(run_nix_inner_raw(str, cmd, true, handler, None)
203 .await?254 .await?
204 .expect("has out"))255 .expect("has out"))
208 assert!(v.is_none());259 assert!(v.is_none());
209 Ok(())260 Ok(())
210}261}
262async fn run_nix_inner_stdout_ssh(
263 str: String,
264 cmd: OwningCommand<Arc<Session>>,
265 handler: &mut dyn Handler,
266) -> Result<Vec<u8>> {
267 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)
268 .await?
269 .expect("has out"))
270}
271async fn run_nix_inner_ssh(
272 str: String,
273 cmd: OwningCommand<Arc<Session>>,
274 handler: &mut dyn Handler,
275) -> Result<()> {
276 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
277 assert!(v.is_none());
278 Ok(())
279}
211280
212pub trait Handler: Send {281pub trait Handler: Send {
213 fn handle_line(&mut self, e: &str);282 fn handle_line(&mut self, e: &str);
468 want_stdout: bool,537 want_stdout: bool,
469 err_handler: &mut dyn Handler,538 err_handler: &mut dyn Handler,
470 mut out_handler: Option<&mut dyn Handler>,539 mut out_handler: Option<&mut dyn Handler>,
471) -> Result<Option<String>> {540) -> Result<Option<Vec<u8>>> {
472 cmd.stderr(Stdio::piped());541 cmd.stderr(Stdio::piped());
473 cmd.stdout(Stdio::piped());542 cmd.stdout(Stdio::piped());
474 let mut child = cmd.spawn()?;543 let mut child = cmd.spawn()?;
522 }591 }
523 }592 }
524593
525 Ok(out_buf.map(String::from_utf8).transpose()?)594 Ok(out_buf)
526}595}
596async fn run_nix_inner_raw_ssh(
597 str: String,
598 mut cmd: OwningCommand<Arc<Session>>,
599 want_stdout: bool,
600 err_handler: &mut dyn Handler,
601 mut out_handler: Option<&mut dyn Handler>,
602) -> Result<Option<Vec<u8>>> {
603 cmd.stderr(openssh::Stdio::piped());
604 cmd.stdout(openssh::Stdio::piped());
605 let mut child = cmd.spawn().await?;
606 let mut stderr = child.stderr().take().unwrap();
607 let stdout = child.stdout().take().unwrap();
608 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
609 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
610 let mut ob = want_stdout
611 .then(|| out.take().unwrap())
612 .unwrap_or_else(|| Box::new(EmptyAsyncRead));
613 let mut ol = (!want_stdout)
614 .then(|| out.take().unwrap())
615 .unwrap_or_else(|| Box::new(EmptyAsyncRead));
616 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
617 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
618
619 // while let Some(line) = read.next().await? {}
620
621 let mut out_buf = if want_stdout { Some(vec![]) } else { None };
622
623 let mut wait_future = pin::pin!(child.wait());
624 loop {
625 select! {
626 e = err.next() => {
627 if let Some(e) = e {
628 let e = e?;
629 err_handler.handle_line(&e);
630 }
631 },
632 o = ob.next() => {
633 if let Some(o) = o {
634 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
635 }
636 },
637 o = ol.next() => {
638 if let Some(o) = o {
639 let o = o?;
640 if let Some(out) = out_handler.as_mut() {
641 out.handle_line(&o)
642 } else {
643 err_handler.handle_line(&o)
644 }
645 // out_handler.handle_info(&o);
646 }
647 },
648 code = &mut wait_future => {
649 let code = code?;
650 if !code.success() {
651 anyhow::bail!("command '{str}' failed with status {}", code);
652 }
653 break;
654 }
655 }
656 }
657
658 Ok(out_buf)
659}
527660
528pub trait ErrorRecorder: Send {661pub trait ErrorRecorder: Send {
529 /// Return true to discard message from logging662 /// Return true to discard message from logging
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,10 +1,10 @@
 use std::{
 	env::current_dir,
-	ffi::OsString,
+	ffi::{OsStr, OsString},
 	io::Write,
 	ops::Deref,
 	path::PathBuf,
-	sync::{Arc, Mutex, MutexGuard},
+	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
 use anyhow::{anyhow, bail, Context, Result};
@@ -46,16 +46,55 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	async fn open_session(&self) -> Result<openssh::Session> {
-		let mut session = SessionBuilder::default();
+	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		// FIXME: TOCTOU
+		if let Some(session) = &self.session.get() {
+			return Ok((*session).clone());
+		};
+		let session = SessionBuilder::default();
 
-		session
+		let session = session
 			.connect(&self.name)
 			.await
-			.map_err(|e| anyhow!("ssh error: {e}"))
+			.map_err(|e| anyhow!("ssh error: {e}"))?;
+		let session = Arc::new(session);
+		self.session.set(session.clone()).expect("TOCTOU happened");
+		Ok(session)
+	}
+	pub async fn mktemp_dir(&self) -> Result<String> {
+		let mut cmd = self.cmd("mktemp").await?;
+		cmd.arg("-d");
+		let path = cmd.run_string().await?;
+		Ok(path.trim_end().to_owned())
 	}
+	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_bytes().await
+	}
+	pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
+		let mut cmd = self.cmd("cat").await?;
+		cmd.arg(path);
+		cmd.run_string().await
+	}
+	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
+		let session = self.open_session().await?;
+		Ok(MyCommand::new_on(cmd, session))
+	}
+
+	pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 }
 
 impl Config {
@@ -96,12 +135,21 @@
 		command.run_string().await
 	}
 
+	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
+		Ok(ConfigHost {
+			name: name.to_owned(),
+			session: OnceLock::new(),
+		})
+	}
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
 		let fleet_field = &self.fleet_field;
 		let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
 		let mut out = vec![];
 		for name in names {
-			out.push(ConfigHost { name })
+			out.push(ConfigHost {
+				name,
+				session: OnceLock::new(),
+			})
 		}
 		Ok(out)
 	}
@@ -152,19 +200,6 @@
 		host_secrets.insert(secret, value);
 	}
 
-	pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>> {
-		let data = z85::encode(&data);
-		let mut cmd = MyCommand::new("fleet-install-secrets");
-		cmd.arg("decrypt").eqarg("--secret", data);
-		cmd = cmd.sudo().ssh(host);
-		let encoded = cmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		z85::decode(encoded).context("bad encoded data? outdated host?")
-	}
 	pub async fn reencrypt_on_host(
 		&self,
 		host: &str,
modifiednixos/meta.nixdiffbeforeafterboth
--- a/nixos/meta.nix
+++ b/nixos/meta.nix
@@ -1,11 +1,18 @@
-{ lib, ... }:
-with lib;
 {
+  lib,
+  pkgs,
+  ...
+}:
+with lib; {
   options = with types; {
+    nixpkgs.resolvedPkgs = mkOption {
+      type = types.pkgs // {description = "nixpkgs.pkgs";};
+      description = "Value of pkgs";
+    };
     tags = mkOption {
       type = listOf str;
       description = "Host tags";
-      default = [ ];
+      default = [];
     };
     network = mkOption {
       type = submodule {
@@ -13,12 +20,12 @@
           internalIps = mkOption {
             type = listOf str;
             description = "Internal ips";
-            default = [ ];
+            default = [];
           };
           externalIps = mkOption {
             type = listOf str;
             description = "External ips";
-            default = [ ];
+            default = [];
           };
         };
       };
@@ -29,7 +36,8 @@
     };
   };
   config = {
-    tags = [ "all" ];
-    network = { };
+    tags = ["all"];
+    network = {};
+    nixpkgs.resolvedPkgs = pkgs;
   };
 }