difftreelog
refactor shell abstraction
in: trunk
6 files changed
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth472 ($field:ident $($tt:tt)*) => {{472 ($field:ident $($tt:tt)*) => {{473 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};473 use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};474 #[allow(unused_mut, reason = "might be used if indexed")]474 #[allow(unused_mut, reason = "might be used if indexed")]475 let mut out = NixExprBuilder::field($field);475 let mut out = NixExprBuilder::field($field.clone());476 nix_expr_inner!(@field(out) $($tt)*);476 nix_expr_inner!(@field(out) $($tt)*);477 out477 out478 }};478 }};cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth294 let drv = nix_go!(fleet_field.buildSystems(Obj {294 let drv = nix_go!(295 fleet_field.buildSystems(Obj {295 localSystem: { config.local_system.clone() }296 localSystem: { config.local_system.clone() }296 }));297 })[{ action.build_attr() }][{ host }]298 );297 let outputs = drv.build().await.map_err(|e| {299 let outputs = drv.build().await.map_err(|e| {298 if action.build_attr() == "sdImage" {300 if action.build_attr() == "sdImage" {cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth1use crate::{1use crate::{2 command::MyCommand,2 fleetdata::{FleetSecret, FleetSharedSecret},3 fleetdata::{FleetSecret, FleetSharedSecret},3 host::Config,4 host::Config,4 nix_go, nix_go_json,5 nix_go, nix_go_json,12 collections::HashSet,13 collections::HashSet,13 io::{self, Cursor, Read},14 io::{self, Cursor, Read},14 path::PathBuf,15 path::PathBuf,16 sync::Arc,15};17};16use tabled::{Table, Tabled};18use tabled::{Table, Tabled};17use tokio::fs::read_to_string;19use tokio::fs::read_to_string;97 Secret::InvokeGenerator => {99 Secret::InvokeGenerator => {98 let config_field = &config.config_unchecked_field;100 let config_field = &config.config_unchecked_field;99101100 let generate_impure =102 let secret =101 nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);103 nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);104 let generate_impure = nix_go!(secret.generateImpure);102 let on = nix_go!(generate_impure.on);105 let on = nix_go!(generate_impure.on);103 let call_package = nix_go!(106 let call_package = nix_go!(104 config_field.buildableSystems(Obj {107 config_field.buildableSystems(Obj {105 localSystem: { config.local_system.clone() }108 localSystem: { config.local_system.clone() }106 })[on]109 })[on]107 .config110 .config108 .nixpkgs111 .nixpkgs109 .pkgs112 .resolvedPkgs110 .callPackage113 .callPackage111 );114 );112 let generator = nix_go!(call_package(generate_impure.generator));115 let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));113 let built = generator.build().await?;116 let built = &generator.build().await?["out"];114 // .as_json().await?;117 let mut nix = MyCommand::new("nix");118 let on: String = on.as_json().await?;119 nix.arg("copy")120 .arg("--substitute-on-destination")121 .comparg("--to", format!("ssh-ng://{on}"))122 .arg(built);123 nix.run_nix().await?;124125 let session = config.host(&on).await?;126127 let owners: Vec<String> = nix_go_json!(secret.expectedOwners);128 dbg!(&owners);129130 let mut recipients = String::new();131 for owner in owners {132 let key = config.key(&owner).await?;133 recipients.push_str(&format!("-r \"{key}\" "));134 }135 recipients.push_str("-e");136137 // FIXME: security: created directory might be accessible to other users138 // This shouldn't be much of a concern, as data is encrypted right after creation, yet139 // still better to have.140 let tempdir = session.mktemp_dir().await?;141142 let mut gen = session.cmd(built).await?;143 gen.env("rageArgs", recipients).env("out", &tempdir);144 gen.run().await?;145146 {147 let marker = session.read_file_text(format!("{tempdir}/marker")).await?;148 ensure!(marker == "SUCCESS", "generation not succeeded");149 }150151 let public = session152 .read_file_bin(format!("{tempdir}/public"))153 .await154 .ok();155 let secret = session156 .read_file_bin(format!("{tempdir}/secret"))157 .await158 .ok();159 if let Some(secret) = &secret {160 ensure!(161 age::Decryptor::new(Cursor::new(&secret)).is_ok(),162 "builder produced non-encrypted value as secret, this is highly insecure"163 );164 }115 dbg!(&built);165 dbg!(&secret);166 // // .as_json().await?;167 // dbg!(&built);116 }168 }117 Secret::ForceKeys => {169 Secret::ForceKeys => {118 for host in config.list_hosts().await? {170 for host in config.list_hosts().await? {249 if secret.secret.is_empty() {301 if secret.secret.is_empty() {250 bail!("no secret {name}");302 bail!("no secret {name}");251 }303 }304 let host = config.host(&machine).await?;252 let data = config.decrypt_on_host(&machine, secret.secret).await?;305 let data = host.decrypt(secret.secret).await?;253 if plaintext {306 if plaintext {254 let s = String::from_utf8(data).context("output is not utf8")?;307 let s = String::from_utf8(data).context("output is not utf8")?;255 print!("{s}");308 print!("{s}");cmds/fleet/src/command.rsdiffbeforeafterboth1use std::{1use std::{2 collections::HashMap,2 collections::HashMap,3 ffi::OsStr,3 ffi::OsStr,4 pin,4 process::Stdio,5 process::Stdio,5 sync::{Arc, Mutex},6 sync::{Arc, Mutex},6 task::Poll,7 task::Poll,10use futures::StreamExt;11use futures::StreamExt;11use itertools::Either;12use itertools::Either;12use once_cell::sync::Lazy;13use once_cell::sync::Lazy;13use openssh::{OverSsh, Session};14use openssh::{OverSsh, OwningCommand, Session};14use regex::Regex;15use regex::Regex;15use serde::{de::Visitor, Deserialize};16use serde::{de::Visitor, Deserialize};16use tokio::{io::AsyncRead, process::Command, select};17use tokio::{io::AsyncRead, process::Command, select};44 ssh_session: Option<Arc<Session>>,45 ssh_session: Option<Arc<Session>>,45}46}46impl MyCommand {47impl MyCommand {48 pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {49 assert!(!cmd.as_ref().is_empty());50 Self {51 command: ostoutf8(cmd),52 args: vec![],53 env: vec![],54 ssh_session: Some(session),55 }56 }47 pub fn new(cmd: impl AsRef<OsStr>) -> Self {57 pub fn new(cmd: impl AsRef<OsStr>) -> Self {48 assert!(!cmd.as_ref().is_empty());58 assert!(!cmd.as_ref().is_empty());49 Self {59 Self {67 out77 out68 }78 }7980 /// Translates environment variables into env command execution.81 /// Required for ssh, as ssh don't allow to send environment variables (at least by default).82 ///83 /// FIXME: Insecure, as arguments might be seen by other users on the same machine.84 /// Figure out some way to transfer environment using stdio?85 fn translate_env_into_env(self) -> Self {86 if self.env.is_empty() {87 return self;88 }89 let mut out = Self::new("env");90 if let Some(session) = self.ssh_session {91 out = out.ssh_session(session);92 }93 for (k, v) in self.env {94 assert!(!k.contains('='));95 out.arg(format!("{k}={v}"));96 }97 out.arg(self.command);98 out.args(self.args);99100 out101 }69 fn into_string(self) -> String {102 fn into_string(self) -> String {70 let mut out = String::new();103 let mut out = String::new();71 if !self.env.is_empty() {104 if !self.env.is_empty() {98 }131 }99 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {132 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {100 Ok(if let Some(session) = self.ssh_session.clone() {133 Ok(if let Some(session) = self.ssh_session.clone() {101 let cmd = self.into_command();134 let cmd = self.translate_env_into_env().into_command();102 Either::Right(135 Either::Right(103 cmd.over_ssh(session)136 cmd.over_ssh(session)104 .map_err(|e| anyhow!("ssh error: {e}"))?,137 .map_err(|e| anyhow!("ssh error: {e}"))?,126 self.arg(value);159 self.arg(value);127 self160 self128 }161 }162 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {163 self.env164 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));165 self166 }129 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {167 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {130 for arg in args.into_iter() {168 for arg in args.into_iter() {131 let arg = arg.as_ref();169 let arg = arg.as_ref();132 self.args.push(ostoutf8(arg));170 self.args.push(ostoutf8(arg));133 }171 }134 self172 self135 }173 }136 pub fn sudo(self) -> Self {174 pub fn sudo(mut self) -> Self {137 if std::env::var_os("NO_SUDO").is_some() {175 if std::env::var_os("NO_SUDO").is_some() {138 let mut out = Self::new("su");176 let mut out = Self::new("su");177 out.ssh_session = self.ssh_session.take();139 out.arg("-c").arg(self.into_string());178 out.arg("-c").arg(self.into_string());140 out179 out141 } else {180 } else {144 out183 out145 }184 }146 }185 }186 pub fn ssh_session(mut self, on: Arc<Session>) -> Self {187 self.ssh_session = Some(on);188 self189 }147 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {190 pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {148 let mut out = Self::new("ssh");191 let mut out = Self::new("ssh");192 out.ssh_session = self.ssh_session.take();149 out.arg(on).arg("--");193 out.arg(on).arg("--");150 out.arg(self.into_string());194 out.arg(self.into_string());151 out195 out152 }196 }153 pub fn over_ssh(mut self, session: Arc<Session>) -> Self {154 self.ssh_session = Some(session);155 self156 }157197158 pub async fn run(self) -> Result<()> {198 pub async fn run(self) -> Result<()> {159 let str = self.clone().into_string();199 let str = self.clone().into_string();160 let cmd = self.into_command();200 let cmd = self.into_command_new()?;201 match cmd {161 run_nix_inner(str, cmd, &mut PlainHandler).await?;202 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,203 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,204 };162 Ok(())205 Ok(())163 }206 }164 pub async fn run_string(self) -> Result<String> {207 pub async fn run_string(self) -> Result<String> {208 let bytes = self.run_bytes().await?;209 Ok(String::from_utf8(bytes)?)210 }211 pub async fn run_bytes(self) -> Result<Vec<u8>> {165 let str = self.clone().into_string();212 let str = self.clone().into_string();166 let cmd = self.into_command();213 let cmd = self.into_command_new()?;167 let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;214 let v = match cmd {215 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,216 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,217 };168 Ok(v)218 Ok(v)169 }219 }170220171 pub async fn run_nix_string(self) -> Result<String> {221 pub async fn run_nix_string(self) -> Result<String> {172 let str = self.clone().into_string();222 let str = self.clone().into_string();173 let mut cmd = self.into_command();223 let mut cmd = self.into_command();174 cmd.arg("--log-format").arg("internal-json");224 cmd.arg("--log-format").arg("internal-json");175 run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await225 let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;226 Ok(String::from_utf8(bytes)?)176 }227 }177 pub async fn run_nix(self) -> Result<()> {228 pub async fn run_nix(self) -> Result<()> {178 let str = self.clone().into_string();229 let str = self.clone().into_string();198 str: String,249 str: String,199 cmd: Command,250 cmd: Command,200 handler: &mut dyn Handler,251 handler: &mut dyn Handler,201) -> Result<String> {252) -> Result<Vec<u8>> {202 Ok(run_nix_inner_raw(str, cmd, true, handler, None)253 Ok(run_nix_inner_raw(str, cmd, true, handler, None)203 .await?254 .await?204 .expect("has out"))255 .expect("has out"))208 assert!(v.is_none());259 assert!(v.is_none());209 Ok(())260 Ok(())210}261}262async fn run_nix_inner_stdout_ssh(263 str: String,264 cmd: OwningCommand<Arc<Session>>,265 handler: &mut dyn Handler,266) -> Result<Vec<u8>> {267 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)268 .await?269 .expect("has out"))270}271async fn run_nix_inner_ssh(272 str: String,273 cmd: OwningCommand<Arc<Session>>,274 handler: &mut dyn Handler,275) -> Result<()> {276 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;277 assert!(v.is_none());278 Ok(())279}211280212pub trait Handler: Send {281pub trait Handler: Send {213 fn handle_line(&mut self, e: &str);282 fn handle_line(&mut self, e: &str);468 want_stdout: bool,537 want_stdout: bool,469 err_handler: &mut dyn Handler,538 err_handler: &mut dyn Handler,470 mut out_handler: Option<&mut dyn Handler>,539 mut out_handler: Option<&mut dyn Handler>,471) -> Result<Option<String>> {540) -> Result<Option<Vec<u8>>> {472 cmd.stderr(Stdio::piped());541 cmd.stderr(Stdio::piped());473 cmd.stdout(Stdio::piped());542 cmd.stdout(Stdio::piped());474 let mut child = cmd.spawn()?;543 let mut child = cmd.spawn()?;522 }591 }523 }592 }524593525 Ok(out_buf.map(String::from_utf8).transpose()?)594 Ok(out_buf)526}595}596async fn run_nix_inner_raw_ssh(597 str: String,598 mut cmd: OwningCommand<Arc<Session>>,599 want_stdout: bool,600 err_handler: &mut dyn Handler,601 mut out_handler: Option<&mut dyn Handler>,602) -> Result<Option<Vec<u8>>> {603 cmd.stderr(openssh::Stdio::piped());604 cmd.stdout(openssh::Stdio::piped());605 let mut child = cmd.spawn().await?;606 let mut stderr = child.stderr().take().unwrap();607 let stdout = child.stdout().take().unwrap();608 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());609 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));610 let mut ob = want_stdout611 .then(|| out.take().unwrap())612 .unwrap_or_else(|| Box::new(EmptyAsyncRead));613 let mut ol = (!want_stdout)614 .then(|| out.take().unwrap())615 .unwrap_or_else(|| Box::new(EmptyAsyncRead));616 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());617 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());618619 // while let Some(line) = read.next().await? {}620621 let mut out_buf = if want_stdout { Some(vec![]) } else { None };622623 let mut wait_future = pin::pin!(child.wait());624 loop {625 select! {626 e = err.next() => {627 if let Some(e) = e {628 let e = e?;629 err_handler.handle_line(&e);630 }631 },632 o = ob.next() => {633 if let Some(o) = o {634 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);635 }636 },637 o = ol.next() => {638 if let Some(o) = o {639 let o = o?;640 if let Some(out) = out_handler.as_mut() {641 out.handle_line(&o)642 } else {643 err_handler.handle_line(&o)644 }645 // out_handler.handle_info(&o);646 }647 },648 code = &mut wait_future => {649 let code = code?;650 if !code.success() {651 anyhow::bail!("command '{str}' failed with status {}", code);652 }653 break;654 }655 }656 }657658 Ok(out_buf)659}527660528pub trait ErrorRecorder: Send {661pub trait ErrorRecorder: Send {529 /// Return true to discard message from logging662 /// Return true to discard message from loggingcmds/fleet/src/host.rsdiffbeforeafterboth1use std::{1use std::{2 env::current_dir,2 env::current_dir,3 ffi::OsString,3 ffi::{OsStr, OsString},4 io::Write,4 io::Write,5 ops::Deref,5 ops::Deref,6 path::PathBuf,6 path::PathBuf,7 sync::{Arc, Mutex, MutexGuard},7 sync::{Arc, Mutex, MutexGuard, OnceLock},8};8};9910use anyhow::{anyhow, bail, Context, Result};10use anyhow::{anyhow, bail, Context, Result};464647pub struct ConfigHost {47pub struct ConfigHost {48 pub name: String,48 pub name: String,49 pub session: OnceLock<Arc<openssh::Session>>,49}50}50impl ConfigHost {51impl ConfigHost {51 async fn open_session(&self) -> Result<openssh::Session> {52 pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {53 // FIXME: TOCTOU54 if let Some(session) = &self.session.get() {55 return Ok((*session).clone());56 };52 let mut session = SessionBuilder::default();57 let session = SessionBuilder::default();535854 session59 let session = session55 .connect(&self.name)60 .connect(&self.name)56 .await61 .await57 .map_err(|e| anyhow!("ssh error: {e}"))62 .map_err(|e| anyhow!("ssh error: {e}"))?;63 let session = Arc::new(session);64 self.session.set(session.clone()).expect("TOCTOU happened");65 Ok(session)58 }66 }67 pub async fn mktemp_dir(&self) -> Result<String> {68 let mut cmd = self.cmd("mktemp").await?;69 cmd.arg("-d");70 let path = cmd.run_string().await?;71 Ok(path.trim_end().to_owned())72 }73 pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {74 let mut cmd = self.cmd("cat").await?;75 cmd.arg(path);76 cmd.run_bytes().await77 }78 pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {79 let mut cmd = self.cmd("cat").await?;80 cmd.arg(path);81 cmd.run_string().await82 }83 pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {84 let session = self.open_session().await?;85 Ok(MyCommand::new_on(cmd, session))86 }8788 pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {89 let mut cmd = self.cmd("fleet-install-secrets").await?;90 cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));91 let encoded = cmd92 .sudo()93 .run_string()94 .await95 .context("failed to call remote host for decrypt")?;96 z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")97 }59}98}609961impl Config {100impl Config {96 command.run_string().await135 command.run_string().await97 }136 }98137138 pub async fn host(&self, name: &str) -> Result<ConfigHost> {139 Ok(ConfigHost {140 name: name.to_owned(),141 session: OnceLock::new(),142 })143 }99 pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {144 pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {100 let fleet_field = &self.fleet_field;145 let fleet_field = &self.fleet_field;101 let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;146 let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;102 let mut out = vec![];147 let mut out = vec![];103 for name in names {148 for name in names {104 out.push(ConfigHost { name })149 out.push(ConfigHost {150 name,151 session: OnceLock::new(),152 })105 }153 }106 Ok(out)154 Ok(out)152 host_secrets.insert(secret, value);200 host_secrets.insert(secret, value);153 }201 }154202155 pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>> {156 let data = z85::encode(&data);157 let mut cmd = MyCommand::new("fleet-install-secrets");158 cmd.arg("decrypt").eqarg("--secret", data);159 cmd = cmd.sudo().ssh(host);160 let encoded = cmd161 .run_string()162 .await163 .context("failed to call remote host for decrypt")?164 .trim()165 .to_owned();166 z85::decode(encoded).context("bad encoded data? outdated host?")167 }168 pub async fn reencrypt_on_host(203 pub async fn reencrypt_on_host(169 &self,204 &self,170 host: &str,205 host: &str,nixos/meta.nixdiffbeforeafterboth1{ lib, ... }:1{2 lib,3 pkgs,4 ...5}:2with lib;6with lib; {3{4 options = with types; {7 options = with types; {8 nixpkgs.resolvedPkgs = mkOption {9 type = types.pkgs // {description = "nixpkgs.pkgs";};10 description = "Value of pkgs";11 };5 tags = mkOption {12 tags = mkOption {6 type = listOf str;13 type = listOf str;7 description = "Host tags";14 description = "Host tags";31 config = {38 config = {32 tags = [ "all" ];39 tags = ["all"];33 network = { };40 network = {};41 nixpkgs.resolvedPkgs = pkgs;34 };42 };35}43}3644