From a369041a95eb6849be03a621e6a11dc54ed9c2e6 Mon Sep 17 00:00:00 2001 From: Yaroslav Bolyukin Date: Thu, 28 Dec 2023 01:41:18 +0000 Subject: [PATCH] refactor: shell abstraction --- --- a/cmds/fleet/src/better_nix_eval.rs +++ b/cmds/fleet/src/better_nix_eval.rs @@ -472,7 +472,7 @@ ($field:ident $($tt:tt)*) => {{ use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner}; #[allow(unused_mut, reason = "might be used if indexed")] - let mut out = NixExprBuilder::field($field); + let mut out = NixExprBuilder::field($field.clone()); nix_expr_inner!(@field(out) $($tt)*); out }}; --- a/cmds/fleet/src/cmds/build_systems.rs +++ b/cmds/fleet/src/cmds/build_systems.rs @@ -291,9 +291,11 @@ info!("building"); let action = Action::from(self.subcommand.clone()); let fleet_field = &config.fleet_field; - let drv = nix_go!(fleet_field.buildSystems(Obj { - localSystem: { config.local_system.clone() } - })); + let drv = nix_go!( + fleet_field.buildSystems(Obj { + localSystem: { config.local_system.clone() } + })[{ action.build_attr() }][{ host }] + ); let outputs = drv.build().await.map_err(|e| { if action.build_attr() == "sdImage" { info!("sd-image build failed"); --- a/cmds/fleet/src/cmds/secrets/mod.rs +++ b/cmds/fleet/src/cmds/secrets/mod.rs @@ -1,4 +1,5 @@ use crate::{ + command::MyCommand, fleetdata::{FleetSecret, FleetSharedSecret}, host::Config, nix_go, nix_go_json, @@ -12,6 +13,7 @@ collections::HashSet, io::{self, Cursor, Read}, path::PathBuf, + sync::Arc, }; use tabled::{Table, Tabled}; use tokio::fs::read_to_string; @@ -97,8 +99,9 @@ Secret::InvokeGenerator => { let config_field = &config.config_unchecked_field; - let generate_impure = - nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure); + let secret = + nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]); + let generate_impure = nix_go!(secret.generateImpure); let on = nix_go!(generate_impure.on); let call_package = nix_go!( config_field.buildableSystems(Obj { @@ -106,13 +109,62 @@ })[on] .config .nixpkgs - .pkgs + .resolvedPkgs .callPackage ); - let generator = nix_go!(call_package(generate_impure.generator)); - let built = generator.build().await?; - // .as_json().await?; - dbg!(&built); + let generator = nix_go!(call_package(generate_impure.generator)(Obj {})); + let built = &generator.build().await?["out"]; + let mut nix = MyCommand::new("nix"); + let on: String = on.as_json().await?; + nix.arg("copy") + .arg("--substitute-on-destination") + .comparg("--to", format!("ssh-ng://{on}")) + .arg(built); + nix.run_nix().await?; + + let session = config.host(&on).await?; + + let owners: Vec = nix_go_json!(secret.expectedOwners); + dbg!(&owners); + + let mut recipients = String::new(); + for owner in owners { + let key = config.key(&owner).await?; + recipients.push_str(&format!("-r \"{key}\" ")); + } + recipients.push_str("-e"); + + // FIXME: security: created directory might be accessible to other users + // This shouldn't be much of a concern, as data is encrypted right after creation, yet + // still better to have. + let tempdir = session.mktemp_dir().await?; + + let mut gen = session.cmd(built).await?; + gen.env("rageArgs", recipients).env("out", &tempdir); + gen.run().await?; + + { + let marker = session.read_file_text(format!("{tempdir}/marker")).await?; + ensure!(marker == "SUCCESS", "generation not succeeded"); + } + + let public = session + .read_file_bin(format!("{tempdir}/public")) + .await + .ok(); + let secret = session + .read_file_bin(format!("{tempdir}/secret")) + .await + .ok(); + if let Some(secret) = &secret { + ensure!( + age::Decryptor::new(Cursor::new(&secret)).is_ok(), + "builder produced non-encrypted value as secret, this is highly insecure" + ); + } + dbg!(&secret); + // // .as_json().await?; + // dbg!(&built); } Secret::ForceKeys => { for host in config.list_hosts().await? { @@ -249,7 +301,8 @@ if secret.secret.is_empty() { bail!("no secret {name}"); } - let data = config.decrypt_on_host(&machine, secret.secret).await?; + let host = config.host(&machine).await?; + let data = host.decrypt(secret.secret).await?; if plaintext { let s = String::from_utf8(data).context("output is not utf8")?; print!("{s}"); --- a/cmds/fleet/src/command.rs +++ b/cmds/fleet/src/command.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, ffi::OsStr, + pin, process::Stdio, sync::{Arc, Mutex}, task::Poll, @@ -10,7 +11,7 @@ use futures::StreamExt; use itertools::Either; use once_cell::sync::Lazy; -use openssh::{OverSsh, Session}; +use openssh::{OverSsh, OwningCommand, Session}; use regex::Regex; use serde::{de::Visitor, Deserialize}; use tokio::{io::AsyncRead, process::Command, select}; @@ -44,6 +45,15 @@ ssh_session: Option>, } impl MyCommand { + pub fn new_on(cmd: impl AsRef, session: Arc) -> Self { + assert!(!cmd.as_ref().is_empty()); + Self { + command: ostoutf8(cmd), + args: vec![], + env: vec![], + ssh_session: Some(session), + } + } pub fn new(cmd: impl AsRef) -> Self { assert!(!cmd.as_ref().is_empty()); Self { @@ -66,6 +76,29 @@ out.extend(self.args); out } + + /// Translates environment variables into env command execution. + /// Required for ssh, as ssh don't allow to send environment variables (at least by default). + /// + /// FIXME: Insecure, as arguments might be seen by other users on the same machine. + /// Figure out some way to transfer environment using stdio? + fn translate_env_into_env(self) -> Self { + if self.env.is_empty() { + return self; + } + let mut out = Self::new("env"); + if let Some(session) = self.ssh_session { + out = out.ssh_session(session); + } + for (k, v) in self.env { + assert!(!k.contains('=')); + out.arg(format!("{k}={v}")); + } + out.arg(self.command); + out.args(self.args); + + out + } fn into_string(self) -> String { let mut out = String::new(); if !self.env.is_empty() { @@ -98,7 +131,7 @@ } fn into_command_new(self) -> Result>>> { Ok(if let Some(session) = self.ssh_session.clone() { - let cmd = self.into_command(); + let cmd = self.translate_env_into_env().into_command(); Either::Right( cmd.over_ssh(session) .map_err(|e| anyhow!("ssh error: {e}"))?, @@ -126,6 +159,11 @@ self.arg(value); self } + pub fn env(&mut self, name: impl AsRef, value: impl AsRef) -> &mut Self { + self.env + .push((name.as_ref().to_owned(), value.as_ref().to_owned())); + self + } pub fn args>(&mut self, args: impl IntoIterator) -> &mut Self { for arg in args.into_iter() { let arg = arg.as_ref(); @@ -133,9 +171,10 @@ } self } - pub fn sudo(self) -> Self { + pub fn sudo(mut self) -> Self { if std::env::var_os("NO_SUDO").is_some() { let mut out = Self::new("su"); + out.ssh_session = self.ssh_session.take(); out.arg("-c").arg(self.into_string()); out } else { @@ -144,27 +183,38 @@ out } } - pub fn ssh(self, on: impl AsRef) -> Self { + pub fn ssh_session(mut self, on: Arc) -> Self { + self.ssh_session = Some(on); + self + } + pub fn ssh(mut self, on: impl AsRef) -> Self { let mut out = Self::new("ssh"); + out.ssh_session = self.ssh_session.take(); out.arg(on).arg("--"); out.arg(self.into_string()); out } - pub fn over_ssh(mut self, session: Arc) -> Self { - self.ssh_session = Some(session); - self - } pub async fn run(self) -> Result<()> { let str = self.clone().into_string(); - let cmd = self.into_command(); - run_nix_inner(str, cmd, &mut PlainHandler).await?; + let cmd = self.into_command_new()?; + match cmd { + Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?, + Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?, + }; Ok(()) } pub async fn run_string(self) -> Result { + let bytes = self.run_bytes().await?; + Ok(String::from_utf8(bytes)?) + } + pub async fn run_bytes(self) -> Result> { let str = self.clone().into_string(); - let cmd = self.into_command(); - let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?; + let cmd = self.into_command_new()?; + let v = match cmd { + Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?, + Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?, + }; Ok(v) } @@ -172,7 +222,8 @@ let str = self.clone().into_string(); let mut cmd = self.into_command(); cmd.arg("--log-format").arg("internal-json"); - run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await + let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?; + Ok(String::from_utf8(bytes)?) } pub async fn run_nix(self) -> Result<()> { let str = self.clone().into_string(); @@ -198,7 +249,7 @@ str: String, cmd: Command, handler: &mut dyn Handler, -) -> Result { +) -> Result> { Ok(run_nix_inner_raw(str, cmd, true, handler, None) .await? .expect("has out")) @@ -208,6 +259,24 @@ assert!(v.is_none()); Ok(()) } +async fn run_nix_inner_stdout_ssh( + str: String, + cmd: OwningCommand>, + handler: &mut dyn Handler, +) -> Result> { + Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None) + .await? + .expect("has out")) +} +async fn run_nix_inner_ssh( + str: String, + cmd: OwningCommand>, + handler: &mut dyn Handler, +) -> Result<()> { + let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?; + assert!(v.is_none()); + Ok(()) +} pub trait Handler: Send { fn handle_line(&mut self, e: &str); @@ -468,7 +537,7 @@ want_stdout: bool, err_handler: &mut dyn Handler, mut out_handler: Option<&mut dyn Handler>, -) -> Result> { +) -> Result>> { cmd.stderr(Stdio::piped()); cmd.stdout(Stdio::piped()); let mut child = cmd.spawn()?; @@ -522,7 +591,71 @@ } } - Ok(out_buf.map(String::from_utf8).transpose()?) + Ok(out_buf) +} +async fn run_nix_inner_raw_ssh( + str: String, + mut cmd: OwningCommand>, + want_stdout: bool, + err_handler: &mut dyn Handler, + mut out_handler: Option<&mut dyn Handler>, +) -> Result>> { + cmd.stderr(openssh::Stdio::piped()); + cmd.stdout(openssh::Stdio::piped()); + let mut child = cmd.spawn().await?; + let mut stderr = child.stderr().take().unwrap(); + let stdout = child.stdout().take().unwrap(); + let mut err = FramedRead::new(&mut stderr, LinesCodec::new()); + let mut out: Option> = Some(Box::new(stdout)); + let mut ob = want_stdout + .then(|| out.take().unwrap()) + .unwrap_or_else(|| Box::new(EmptyAsyncRead)); + let mut ol = (!want_stdout) + .then(|| out.take().unwrap()) + .unwrap_or_else(|| Box::new(EmptyAsyncRead)); + let mut ob = FramedRead::new(&mut ob, BytesCodec::new()); + let mut ol = FramedRead::new(&mut ol, LinesCodec::new()); + + // while let Some(line) = read.next().await? {} + + let mut out_buf = if want_stdout { Some(vec![]) } else { None }; + + let mut wait_future = pin::pin!(child.wait()); + loop { + select! { + e = err.next() => { + if let Some(e) = e { + let e = e?; + err_handler.handle_line(&e); + } + }, + o = ob.next() => { + if let Some(o) = o { + out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?); + } + }, + o = ol.next() => { + if let Some(o) = o { + let o = o?; + if let Some(out) = out_handler.as_mut() { + out.handle_line(&o) + } else { + err_handler.handle_line(&o) + } + // out_handler.handle_info(&o); + } + }, + code = &mut wait_future => { + let code = code?; + if !code.success() { + anyhow::bail!("command '{str}' failed with status {}", code); + } + break; + } + } + } + + Ok(out_buf) } pub trait ErrorRecorder: Send { --- a/cmds/fleet/src/host.rs +++ b/cmds/fleet/src/host.rs @@ -1,10 +1,10 @@ use std::{ env::current_dir, - ffi::OsString, + ffi::{OsStr, OsString}, io::Write, ops::Deref, path::PathBuf, - sync::{Arc, Mutex, MutexGuard}, + sync::{Arc, Mutex, MutexGuard, OnceLock}, }; use anyhow::{anyhow, bail, Context, Result}; @@ -46,16 +46,55 @@ pub struct ConfigHost { pub name: String, + pub session: OnceLock>, } impl ConfigHost { - async fn open_session(&self) -> Result { - let mut session = SessionBuilder::default(); + pub async fn open_session(&self) -> Result> { + // FIXME: TOCTOU + if let Some(session) = &self.session.get() { + return Ok((*session).clone()); + }; + let session = SessionBuilder::default(); - session + let session = session .connect(&self.name) .await - .map_err(|e| anyhow!("ssh error: {e}")) + .map_err(|e| anyhow!("ssh error: {e}"))?; + let session = Arc::new(session); + self.session.set(session.clone()).expect("TOCTOU happened"); + Ok(session) + } + pub async fn mktemp_dir(&self) -> Result { + let mut cmd = self.cmd("mktemp").await?; + cmd.arg("-d"); + let path = cmd.run_string().await?; + Ok(path.trim_end().to_owned()) } + pub async fn read_file_bin(&self, path: impl AsRef) -> Result> { + let mut cmd = self.cmd("cat").await?; + cmd.arg(path); + cmd.run_bytes().await + } + pub async fn read_file_text(&self, path: impl AsRef) -> Result { + let mut cmd = self.cmd("cat").await?; + cmd.arg(path); + cmd.run_string().await + } + pub async fn cmd(&self, cmd: impl AsRef) -> Result { + let session = self.open_session().await?; + Ok(MyCommand::new_on(cmd, session)) + } + + pub async fn decrypt(&self, data: Vec) -> Result> { + let mut cmd = self.cmd("fleet-install-secrets").await?; + cmd.arg("decrypt").eqarg("--secret", z85::encode(&data)); + let encoded = cmd + .sudo() + .run_string() + .await + .context("failed to call remote host for decrypt")?; + z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?") + } } impl Config { @@ -96,12 +135,21 @@ command.run_string().await } + pub async fn host(&self, name: &str) -> Result { + Ok(ConfigHost { + name: name.to_owned(), + session: OnceLock::new(), + }) + } pub async fn list_hosts(&self) -> Result> { let fleet_field = &self.fleet_field; let names = nix_go!(fleet_field.configuredHosts).list_fields().await?; let mut out = vec![]; for name in names { - out.push(ConfigHost { name }) + out.push(ConfigHost { + name, + session: OnceLock::new(), + }) } Ok(out) } @@ -152,19 +200,6 @@ host_secrets.insert(secret, value); } - pub async fn decrypt_on_host(&self, host: &str, data: Vec) -> Result> { - let data = z85::encode(&data); - let mut cmd = MyCommand::new("fleet-install-secrets"); - cmd.arg("decrypt").eqarg("--secret", data); - cmd = cmd.sudo().ssh(host); - let encoded = cmd - .run_string() - .await - .context("failed to call remote host for decrypt")? - .trim() - .to_owned(); - z85::decode(encoded).context("bad encoded data? outdated host?") - } pub async fn reencrypt_on_host( &self, host: &str, --- a/nixos/meta.nix +++ b/nixos/meta.nix @@ -1,11 +1,18 @@ -{ lib, ... }: -with lib; { + lib, + pkgs, + ... +}: +with lib; { options = with types; { + nixpkgs.resolvedPkgs = mkOption { + type = types.pkgs // {description = "nixpkgs.pkgs";}; + description = "Value of pkgs"; + }; tags = mkOption { type = listOf str; description = "Host tags"; - default = [ ]; + default = []; }; network = mkOption { type = submodule { @@ -13,12 +20,12 @@ internalIps = mkOption { type = listOf str; description = "Internal ips"; - default = [ ]; + default = []; }; externalIps = mkOption { type = listOf str; description = "External ips"; - default = [ ]; + default = []; }; }; }; @@ -29,7 +36,8 @@ }; }; config = { - tags = [ "all" ]; - network = { }; + tags = ["all"]; + network = {}; + nixpkgs.resolvedPkgs = pkgs; }; } -- gitstuff