difftreelog
refactor shell abstraction
in: trunk
6 files changed
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -472,7 +472,7 @@
($field:ident $($tt:tt)*) => {{
use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};
#[allow(unused_mut, reason = "might be used if indexed")]
- let mut out = NixExprBuilder::field($field);
+ let mut out = NixExprBuilder::field($field.clone());
nix_expr_inner!(@field(out) $($tt)*);
out
}};
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -291,9 +291,11 @@
info!("building");
let action = Action::from(self.subcommand.clone());
let fleet_field = &config.fleet_field;
- let drv = nix_go!(fleet_field.buildSystems(Obj {
- localSystem: { config.local_system.clone() }
- }));
+ let drv = nix_go!(
+ fleet_field.buildSystems(Obj {
+ localSystem: { config.local_system.clone() }
+ })[{ action.build_attr() }][{ host }]
+ );
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
info!("sd-image build failed");
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth1use crate::{2 fleetdata::{FleetSecret, FleetSharedSecret},3 host::Config,4 nix_go, nix_go_json,5};6use anyhow::{anyhow, bail, ensure, Context, Result};7use chrono::{DateTime, Utc};8use clap::Parser;9use futures::{StreamExt, TryStreamExt};10use owo_colors::OwoColorize;11use std::{12 collections::HashSet,13 io::{self, Cursor, Read},14 path::PathBuf,15};16use tabled::{Table, Tabled};17use tokio::fs::read_to_string;18use tracing::{error, info, info_span, warn};1920#[derive(Parser)]21pub enum Secret {22 /// Force load host keys for all defined hosts23 ForceKeys,24 /// Add secret, data should be provided in stdin25 AddShared {26 /// Secret name27 name: String,28 /// Secret owners29 machines: Vec<String>,30 /// Override secret if already present31 #[clap(long)]32 force: bool,33 /// Secret public part34 #[clap(long)]35 public: Option<String>,36 /// Load public part from specified file37 #[clap(long)]38 public_file: Option<PathBuf>,3940 /// Create a notification on secret expiration41 #[clap(long)]42 expires_at: Option<DateTime<Utc>>,4344 /// Secret with this name already exists, override its value while keeping the same owners.45 #[clap(long)]46 re_add: bool,47 },48 /// Add secret, data should be provided in stdin49 Add {50 /// Secret name51 name: String,52 /// Secret owners53 machine: String,54 /// Override secret if already present55 #[clap(long)]56 force: bool,57 #[clap(long)]58 public: Option<String>,59 #[clap(long)]60 public_file: Option<PathBuf>,61 },62 /// Read secret from remote host, requires sudo on said host63 Read {64 name: String,65 machine: String,66 #[clap(long)]67 plaintext: bool,68 },69 UpdateShared {70 name: String,7172 #[clap(long)]73 machines: Option<Vec<String>>,7475 #[clap(long)]76 add_machines: Vec<String>,77 #[clap(long)]78 remove_machines: Vec<String>,7980 /// Which host should we use to decrypt81 #[clap(long)]82 prefer_identities: Vec<String>,83 },84 Regenerate {85 /// Which host should we use to decrypt, in case if reencryption is required, without86 /// regeneration87 #[clap(long)]88 prefer_identities: Vec<String>,89 },90 List {},91 InvokeGenerator,92}9394impl Secret {95 pub async fn run(self, config: &Config) -> Result<()> {96 match self {97 Secret::InvokeGenerator => {98 let config_field = &config.config_unchecked_field;99100 let generate_impure =101 nix_go!(config_field.sharedSecrets["kube-apiserver.pem"].generateImpure);102 let on = nix_go!(generate_impure.on);103 let call_package = nix_go!(104 config_field.buildableSystems(Obj {105 localSystem: { config.local_system.clone() }106 })[on]107 .config108 .nixpkgs109 .pkgs110 .callPackage111 );112 let generator = nix_go!(call_package(generate_impure.generator));113 let built = generator.build().await?;114 // .as_json().await?;115 dbg!(&built);116 }117 Secret::ForceKeys => {118 for host in config.list_hosts().await? {119 if config.should_skip(&host.name) {120 continue;121 }122 config.key(&host.name).await?;123 }124 }125 Secret::AddShared {126 mut machines,127 name,128 force,129 public,130 public_file,131 expires_at,132 re_add,133 } => {134 let exists = config.has_shared(&name);135 if exists && !force && !re_add {136 bail!("secret already defined");137 }138 if re_add {139 // Fixme: use clap to limit this usage140 ensure!(!force, "--force and --readd are not compatible");141 ensure!(exists, "secret doesn't exists");142 ensure!(143 machines.is_empty(),144 "you can't use machines argument for --readd"145 );146 let shared = config.shared_secret(&name)?;147 machines = shared.owners;148 }149150 let recipients = futures::stream::iter(machines.iter())151 .then(|m| config.recipient(m))152 .try_collect::<Vec<_>>()153 .await?;154155 let secret = {156 let mut input = vec![];157 io::stdin().read_to_end(&mut input)?;158159 if input.is_empty() {160 input161 } else {162 let mut encrypted = vec![];163 let recipients = recipients164 .iter()165 .cloned()166 .map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)167 .collect();168 let mut encryptor = age::Encryptor::with_recipients(recipients)169 .ok_or_else(|| anyhow!("no recipients provided"))?170 .wrap_output(&mut encrypted)?;171 io::copy(&mut Cursor::new(input), &mut encryptor)?;172 encryptor.finish()?;173 encrypted174 }175 };176 config.replace_shared(177 name,178 FleetSharedSecret {179 owners: machines,180 secret: FleetSecret {181 created_at: Utc::now(),182 expires_at,183 secret,184 public: match (public, public_file) {185 (Some(v), None) => Some(v),186 (None, Some(v)) => Some(read_to_string(v).await?),187 (Some(_), Some(_)) => {188 bail!("only public or public_file should be set")189 }190 (None, None) => None,191 },192 },193 },194 );195 }196 Secret::Add {197 machine,198 name,199 force,200 public,201 public_file,202 } => {203 let recipient = config.recipient(&machine).await?;204205 let secret = {206 let mut input = vec![];207 io::stdin().read_to_end(&mut input)?;208 if input.is_empty() {209 bail!("no data provided")210 }211212 let mut encrypted = vec![];213 let recipient = Box::new(recipient) as Box<dyn age::Recipient + Send>;214 let mut encryptor = age::Encryptor::with_recipients(vec![recipient])215 .expect("recipients provided")216 .wrap_output(&mut encrypted)?;217 io::copy(&mut Cursor::new(input), &mut encryptor)?;218 encryptor.finish()?;219 encrypted220 };221222 if config.has_secret(&machine, &name) && !force {223 bail!("secret already defined");224 }225 config.insert_secret(226 &machine,227 name,228 FleetSecret {229 created_at: Utc::now(),230 expires_at: None,231 secret,232 public: match (public, public_file) {233 (Some(v), None) => Some(v),234 (None, Some(v)) => Some(std::fs::read_to_string(v)?),235 (Some(_), Some(_)) => bail!("only public or public_file should be set"),236 (None, None) => None,237 },238 },239 );240 }241 // TODO: Instead of using sudo, decode secret on remote machine242 #[allow(clippy::await_holding_refcell_ref)]243 Secret::Read {244 name,245 machine,246 plaintext,247 } => {248 let secret = config.host_secret(&machine, &name)?;249 if secret.secret.is_empty() {250 bail!("no secret {name}");251 }252 let data = config.decrypt_on_host(&machine, secret.secret).await?;253 if plaintext {254 let s = String::from_utf8(data).context("output is not utf8")?;255 print!("{s}");256 } else {257 println!("{}", z85::encode(&data));258 }259 }260 Secret::UpdateShared {261 name,262 machines,263 mut add_machines,264 mut remove_machines,265 prefer_identities,266 } => {267 if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {268 bail!("no operation");269 }270271 let mut secret = config.shared_secret(&name)?;272 if secret.secret.secret.is_empty() {273 bail!("no secret");274 }275276 let initial_machines = secret.owners.clone();277 let mut target_machines = secret.owners.clone();278 info!("Currently encrypted for {initial_machines:?}");279280 // ensure!(machines.is_some() || !add_machines.is_empty() || )281 if let Some(machines) = machines {282 ensure!(283 add_machines.is_empty() && remove_machines.is_empty(),284 "can't combine --machines and --add-machines/--remove-machines"285 );286 let target = initial_machines.iter().collect::<HashSet<_>>();287 let source = machines.iter().collect::<HashSet<_>>();288 for removed in target.difference(&source) {289 remove_machines.push((*removed).clone());290 }291 for added in source.difference(&target) {292 add_machines.push((*added).clone());293 }294 }295296 for machine in &remove_machines {297 let mut removed = false;298 while let Some(pos) = target_machines.iter().position(|m| m == machine) {299 target_machines.swap_remove(pos);300 removed = true;301 }302 if !removed {303 warn!("secret is not enabled for {machine}");304 }305 }306 for machine in &add_machines {307 if target_machines.iter().any(|m| m == machine) {308 warn!("secret is already added to {machine}");309 } else {310 target_machines.push(machine.to_owned());311 }312 }313 if !remove_machines.is_empty() {314 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");315 }316317 if target_machines.is_empty() {318 info!("no machines left for secret, removing it");319 config.remove_shared(&name);320 return Ok(());321 }322323 if target_machines == initial_machines {324 warn!("secret owners are already correct");325 return Ok(());326 }327328 let identity_holder = if !prefer_identities.is_empty() {329 prefer_identities330 .iter()331 .find(|i| initial_machines.iter().any(|s| s == *i))332 } else {333 secret.owners.first()334 };335 let Some(identity_holder) = identity_holder else {336 bail!("no available holder found");337 };338 let target_recipients = futures::stream::iter(&target_machines)339 .then(|m| async { config.key(m).await })340 .collect::<Vec<_>>()341 .await;342 let target_recipients =343 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;344345 let encrypted = config346 .reencrypt_on_host(identity_holder, secret.secret.secret, target_recipients)347 .await?;348349 secret.owners = target_machines;350 secret.secret.secret = encrypted;351 config.replace_shared(name, secret);352 }353 Secret::Regenerate { prefer_identities } => {354 {355 let expected_shared_set = config356 .list_configured_shared()357 .await?358 .into_iter()359 .collect::<HashSet<_>>();360 let shared_set = config.list_shared().into_iter().collect::<HashSet<_>>();361 for removed in expected_shared_set.difference(&shared_set) {362 error!("secret needs to be generated: {removed}")363 }364 }365 let mut to_remove = Vec::new();366 for name in &config.list_shared() {367 info!("updating secret: {name}");368 let mut data = config.shared_secret(name)?;369 let config_field = &config.config_field;370 let expected_owners: Vec<String> =371 nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);372 if expected_owners.is_empty() {373 warn!("secret was removed from fleet config: {name}, removing from data");374 to_remove.push(name.to_string());375 continue;376 }377 let set = data.owners.iter().collect::<HashSet<_>>();378 let expected_set = expected_owners.iter().collect::<HashSet<_>>();379 let should_remove = set.difference(&expected_set).next().is_some();380 if set != expected_set {381 let owner_dependent: bool =382 nix_go_json!(config_field.sharedSecrets[{ name }].ownerDependent);383 if !owner_dependent {384 warn!("reencrypting secret '{name}' for new owner set");385 // TODO: force regeneration386 if should_remove {387 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");388 }389390 let identity_holder = if !prefer_identities.is_empty() {391 prefer_identities392 .iter()393 .find(|i| data.owners.iter().any(|s| s == *i))394 } else {395 data.owners.first()396 };397 let Some(identity_holder) = identity_holder else {398 bail!("no available holder found");399 };400401 let target_recipients = futures::stream::iter(&expected_owners)402 .then(|m| async { config.key(m).await })403 .collect::<Vec<_>>()404 .await;405 let target_recipients =406 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;407408 let encrypted = config409 .reencrypt_on_host(410 identity_holder,411 data.secret.secret,412 target_recipients,413 )414 .await?;415416 data.secret.secret = encrypted;417 data.owners = expected_owners;418 config.replace_shared(name.to_owned(), data);419 } else {420 error!("secret '{name}' should be regenerated manually");421 }422 } else {423 info!("secret data is ok")424 }425 }426 for k in to_remove {427 config.remove_shared(&k);428 }429 }430 Secret::List {} => {431 let _span = info_span!("loading secrets").entered();432 let configured = config.list_configured_shared().await?;433 #[derive(Tabled)]434 struct SecretDisplay {435 #[tabled(rename = "Name")]436 name: String,437 #[tabled(rename = "Owners")]438 owners: String,439 }440 let mut table = vec![];441 for name in configured.iter().cloned() {442 let config = config.clone();443 let expected_owners = config.shared_secret_expected_owners(&name).await?;444 let data = config.shared_secret(&name)?;445 let owners = data446 .owners447 .iter()448 .map(|o| {449 if expected_owners.contains(o) {450 o.green().to_string()451 } else {452 o.red().to_string()453 }454 })455 .collect::<Vec<_>>();456 table.push(SecretDisplay {457 owners: owners.join(", "),458 name,459 })460 }461 info!("loaded\n{}", Table::new(table).to_string())462 }463 }464 Ok(())465 }466}1use crate::{2 command::MyCommand,3 fleetdata::{FleetSecret, FleetSharedSecret},4 host::Config,5 nix_go, nix_go_json,6};7use anyhow::{anyhow, bail, ensure, Context, Result};8use chrono::{DateTime, Utc};9use clap::Parser;10use futures::{StreamExt, TryStreamExt};11use owo_colors::OwoColorize;12use std::{13 collections::HashSet,14 io::{self, Cursor, Read},15 path::PathBuf,16 sync::Arc,17};18use tabled::{Table, Tabled};19use tokio::fs::read_to_string;20use tracing::{error, info, info_span, warn};2122#[derive(Parser)]23pub enum Secret {24 /// Force load host keys for all defined hosts25 ForceKeys,26 /// Add secret, data should be provided in stdin27 AddShared {28 /// Secret name29 name: String,30 /// Secret owners31 machines: Vec<String>,32 /// Override secret if already present33 #[clap(long)]34 force: bool,35 /// Secret public part36 #[clap(long)]37 public: Option<String>,38 /// Load public part from specified file39 #[clap(long)]40 public_file: Option<PathBuf>,4142 /// Create a notification on secret expiration43 #[clap(long)]44 expires_at: Option<DateTime<Utc>>,4546 /// Secret with this name already exists, override its value while keeping the same owners.47 #[clap(long)]48 re_add: bool,49 },50 /// Add secret, data should be provided in stdin51 Add {52 /// Secret name53 name: String,54 /// Secret owners55 machine: String,56 /// Override secret if already present57 #[clap(long)]58 force: bool,59 #[clap(long)]60 public: Option<String>,61 #[clap(long)]62 public_file: Option<PathBuf>,63 },64 /// Read secret from remote host, requires sudo on said host65 Read {66 name: String,67 machine: String,68 #[clap(long)]69 plaintext: bool,70 },71 UpdateShared {72 name: String,7374 #[clap(long)]75 machines: Option<Vec<String>>,7677 #[clap(long)]78 add_machines: Vec<String>,79 #[clap(long)]80 remove_machines: Vec<String>,8182 /// Which host should we use to decrypt83 #[clap(long)]84 prefer_identities: Vec<String>,85 },86 Regenerate {87 /// Which host should we use to decrypt, in case if reencryption is required, without88 /// regeneration89 #[clap(long)]90 prefer_identities: Vec<String>,91 },92 List {},93 InvokeGenerator,94}9596impl Secret {97 pub async fn run(self, config: &Config) -> Result<()> {98 match self {99 Secret::InvokeGenerator => {100 let config_field = &config.config_unchecked_field;101102 let secret =103 nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);104 let generate_impure = nix_go!(secret.generateImpure);105 let on = nix_go!(generate_impure.on);106 let call_package = nix_go!(107 config_field.buildableSystems(Obj {108 localSystem: { config.local_system.clone() }109 })[on]110 .config111 .nixpkgs112 .resolvedPkgs113 .callPackage114 );115 let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));116 let built = &generator.build().await?["out"];117 let mut nix = MyCommand::new("nix");118 let on: String = on.as_json().await?;119 nix.arg("copy")120 .arg("--substitute-on-destination")121 .comparg("--to", format!("ssh-ng://{on}"))122 .arg(built);123 nix.run_nix().await?;124125 let session = config.host(&on).await?;126127 let owners: Vec<String> = nix_go_json!(secret.expectedOwners);128 dbg!(&owners);129130 let mut recipients = String::new();131 for owner in owners {132 let key = config.key(&owner).await?;133 recipients.push_str(&format!("-r \"{key}\" "));134 }135 recipients.push_str("-e");136137 // FIXME: security: created directory might be accessible to other users138 // This shouldn't be much of a concern, as data is encrypted right after creation, yet139 // still better to have.140 let tempdir = session.mktemp_dir().await?;141142 let mut gen = session.cmd(built).await?;143 gen.env("rageArgs", recipients).env("out", &tempdir);144 gen.run().await?;145146 {147 let marker = session.read_file_text(format!("{tempdir}/marker")).await?;148 ensure!(marker == "SUCCESS", "generation not succeeded");149 }150151 let public = session152 .read_file_bin(format!("{tempdir}/public"))153 .await154 .ok();155 let secret = session156 .read_file_bin(format!("{tempdir}/secret"))157 .await158 .ok();159 if let Some(secret) = &secret {160 ensure!(161 age::Decryptor::new(Cursor::new(&secret)).is_ok(),162 "builder produced non-encrypted value as secret, this is highly insecure"163 );164 }165 dbg!(&secret);166 // // .as_json().await?;167 // dbg!(&built);168 }169 Secret::ForceKeys => {170 for host in config.list_hosts().await? {171 if config.should_skip(&host.name) {172 continue;173 }174 config.key(&host.name).await?;175 }176 }177 Secret::AddShared {178 mut machines,179 name,180 force,181 public,182 public_file,183 expires_at,184 re_add,185 } => {186 let exists = config.has_shared(&name);187 if exists && !force && !re_add {188 bail!("secret already defined");189 }190 if re_add {191 // Fixme: use clap to limit this usage192 ensure!(!force, "--force and --readd are not compatible");193 ensure!(exists, "secret doesn't exists");194 ensure!(195 machines.is_empty(),196 "you can't use machines argument for --readd"197 );198 let shared = config.shared_secret(&name)?;199 machines = shared.owners;200 }201202 let recipients = futures::stream::iter(machines.iter())203 .then(|m| config.recipient(m))204 .try_collect::<Vec<_>>()205 .await?;206207 let secret = {208 let mut input = vec![];209 io::stdin().read_to_end(&mut input)?;210211 if input.is_empty() {212 input213 } else {214 let mut encrypted = vec![];215 let recipients = recipients216 .iter()217 .cloned()218 .map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)219 .collect();220 let mut encryptor = age::Encryptor::with_recipients(recipients)221 .ok_or_else(|| anyhow!("no recipients provided"))?222 .wrap_output(&mut encrypted)?;223 io::copy(&mut Cursor::new(input), &mut encryptor)?;224 encryptor.finish()?;225 encrypted226 }227 };228 config.replace_shared(229 name,230 FleetSharedSecret {231 owners: machines,232 secret: FleetSecret {233 created_at: Utc::now(),234 expires_at,235 secret,236 public: match (public, public_file) {237 (Some(v), None) => Some(v),238 (None, Some(v)) => Some(read_to_string(v).await?),239 (Some(_), Some(_)) => {240 bail!("only public or public_file should be set")241 }242 (None, None) => None,243 },244 },245 },246 );247 }248 Secret::Add {249 machine,250 name,251 force,252 public,253 public_file,254 } => {255 let recipient = config.recipient(&machine).await?;256257 let secret = {258 let mut input = vec![];259 io::stdin().read_to_end(&mut input)?;260 if input.is_empty() {261 bail!("no data provided")262 }263264 let mut encrypted = vec![];265 let recipient = Box::new(recipient) as Box<dyn age::Recipient + Send>;266 let mut encryptor = age::Encryptor::with_recipients(vec![recipient])267 .expect("recipients provided")268 .wrap_output(&mut encrypted)?;269 io::copy(&mut Cursor::new(input), &mut encryptor)?;270 encryptor.finish()?;271 encrypted272 };273274 if config.has_secret(&machine, &name) && !force {275 bail!("secret already defined");276 }277 config.insert_secret(278 &machine,279 name,280 FleetSecret {281 created_at: Utc::now(),282 expires_at: None,283 secret,284 public: match (public, public_file) {285 (Some(v), None) => Some(v),286 (None, Some(v)) => Some(std::fs::read_to_string(v)?),287 (Some(_), Some(_)) => bail!("only public or public_file should be set"),288 (None, None) => None,289 },290 },291 );292 }293 // TODO: Instead of using sudo, decode secret on remote machine294 #[allow(clippy::await_holding_refcell_ref)]295 Secret::Read {296 name,297 machine,298 plaintext,299 } => {300 let secret = config.host_secret(&machine, &name)?;301 if secret.secret.is_empty() {302 bail!("no secret {name}");303 }304 let host = config.host(&machine).await?;305 let data = host.decrypt(secret.secret).await?;306 if plaintext {307 let s = String::from_utf8(data).context("output is not utf8")?;308 print!("{s}");309 } else {310 println!("{}", z85::encode(&data));311 }312 }313 Secret::UpdateShared {314 name,315 machines,316 mut add_machines,317 mut remove_machines,318 prefer_identities,319 } => {320 if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {321 bail!("no operation");322 }323324 let mut secret = config.shared_secret(&name)?;325 if secret.secret.secret.is_empty() {326 bail!("no secret");327 }328329 let initial_machines = secret.owners.clone();330 let mut target_machines = secret.owners.clone();331 info!("Currently encrypted for {initial_machines:?}");332333 // ensure!(machines.is_some() || !add_machines.is_empty() || )334 if let Some(machines) = machines {335 ensure!(336 add_machines.is_empty() && remove_machines.is_empty(),337 "can't combine --machines and --add-machines/--remove-machines"338 );339 let target = initial_machines.iter().collect::<HashSet<_>>();340 let source = machines.iter().collect::<HashSet<_>>();341 for removed in target.difference(&source) {342 remove_machines.push((*removed).clone());343 }344 for added in source.difference(&target) {345 add_machines.push((*added).clone());346 }347 }348349 for machine in &remove_machines {350 let mut removed = false;351 while let Some(pos) = target_machines.iter().position(|m| m == machine) {352 target_machines.swap_remove(pos);353 removed = true;354 }355 if !removed {356 warn!("secret is not enabled for {machine}");357 }358 }359 for machine in &add_machines {360 if target_machines.iter().any(|m| m == machine) {361 warn!("secret is already added to {machine}");362 } else {363 target_machines.push(machine.to_owned());364 }365 }366 if !remove_machines.is_empty() {367 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");368 }369370 if target_machines.is_empty() {371 info!("no machines left for secret, removing it");372 config.remove_shared(&name);373 return Ok(());374 }375376 if target_machines == initial_machines {377 warn!("secret owners are already correct");378 return Ok(());379 }380381 let identity_holder = if !prefer_identities.is_empty() {382 prefer_identities383 .iter()384 .find(|i| initial_machines.iter().any(|s| s == *i))385 } else {386 secret.owners.first()387 };388 let Some(identity_holder) = identity_holder else {389 bail!("no available holder found");390 };391 let target_recipients = futures::stream::iter(&target_machines)392 .then(|m| async { config.key(m).await })393 .collect::<Vec<_>>()394 .await;395 let target_recipients =396 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;397398 let encrypted = config399 .reencrypt_on_host(identity_holder, secret.secret.secret, target_recipients)400 .await?;401402 secret.owners = target_machines;403 secret.secret.secret = encrypted;404 config.replace_shared(name, secret);405 }406 Secret::Regenerate { prefer_identities } => {407 {408 let expected_shared_set = config409 .list_configured_shared()410 .await?411 .into_iter()412 .collect::<HashSet<_>>();413 let shared_set = config.list_shared().into_iter().collect::<HashSet<_>>();414 for removed in expected_shared_set.difference(&shared_set) {415 error!("secret needs to be generated: {removed}")416 }417 }418 let mut to_remove = Vec::new();419 for name in &config.list_shared() {420 info!("updating secret: {name}");421 let mut data = config.shared_secret(name)?;422 let config_field = &config.config_field;423 let expected_owners: Vec<String> =424 nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);425 if expected_owners.is_empty() {426 warn!("secret was removed from fleet config: {name}, removing from data");427 to_remove.push(name.to_string());428 continue;429 }430 let set = data.owners.iter().collect::<HashSet<_>>();431 let expected_set = expected_owners.iter().collect::<HashSet<_>>();432 let should_remove = set.difference(&expected_set).next().is_some();433 if set != expected_set {434 let owner_dependent: bool =435 nix_go_json!(config_field.sharedSecrets[{ name }].ownerDependent);436 if !owner_dependent {437 warn!("reencrypting secret '{name}' for new owner set");438 // TODO: force regeneration439 if should_remove {440 warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");441 }442443 let identity_holder = if !prefer_identities.is_empty() {444 prefer_identities445 .iter()446 .find(|i| data.owners.iter().any(|s| s == *i))447 } else {448 data.owners.first()449 };450 let Some(identity_holder) = identity_holder else {451 bail!("no available holder found");452 };453454 let target_recipients = futures::stream::iter(&expected_owners)455 .then(|m| async { config.key(m).await })456 .collect::<Vec<_>>()457 .await;458 let target_recipients =459 target_recipients.into_iter().collect::<Result<Vec<_>>>()?;460461 let encrypted = config462 .reencrypt_on_host(463 identity_holder,464 data.secret.secret,465 target_recipients,466 )467 .await?;468469 data.secret.secret = encrypted;470 data.owners = expected_owners;471 config.replace_shared(name.to_owned(), data);472 } else {473 error!("secret '{name}' should be regenerated manually");474 }475 } else {476 info!("secret data is ok")477 }478 }479 for k in to_remove {480 config.remove_shared(&k);481 }482 }483 Secret::List {} => {484 let _span = info_span!("loading secrets").entered();485 let configured = config.list_configured_shared().await?;486 #[derive(Tabled)]487 struct SecretDisplay {488 #[tabled(rename = "Name")]489 name: String,490 #[tabled(rename = "Owners")]491 owners: String,492 }493 let mut table = vec![];494 for name in configured.iter().cloned() {495 let config = config.clone();496 let expected_owners = config.shared_secret_expected_owners(&name).await?;497 let data = config.shared_secret(&name)?;498 let owners = data499 .owners500 .iter()501 .map(|o| {502 if expected_owners.contains(o) {503 o.green().to_string()504 } else {505 o.red().to_string()506 }507 })508 .collect::<Vec<_>>();509 table.push(SecretDisplay {510 owners: owners.join(", "),511 name,512 })513 }514 info!("loaded\n{}", Table::new(table).to_string())515 }516 }517 Ok(())518 }519}cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
ffi::OsStr,
+ pin,
process::Stdio,
sync::{Arc, Mutex},
task::Poll,
@@ -10,7 +11,7 @@
use futures::StreamExt;
use itertools::Either;
use once_cell::sync::Lazy;
-use openssh::{OverSsh, Session};
+use openssh::{OverSsh, OwningCommand, Session};
use regex::Regex;
use serde::{de::Visitor, Deserialize};
use tokio::{io::AsyncRead, process::Command, select};
@@ -44,6 +45,15 @@
ssh_session: Option<Arc<Session>>,
}
impl MyCommand {
+ pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {
+ assert!(!cmd.as_ref().is_empty());
+ Self {
+ command: ostoutf8(cmd),
+ args: vec![],
+ env: vec![],
+ ssh_session: Some(session),
+ }
+ }
pub fn new(cmd: impl AsRef<OsStr>) -> Self {
assert!(!cmd.as_ref().is_empty());
Self {
@@ -66,6 +76,29 @@
out.extend(self.args);
out
}
+
+ /// Translates environment variables into env command execution.
+ /// Required for ssh, as ssh don't allow to send environment variables (at least by default).
+ ///
+ /// FIXME: Insecure, as arguments might be seen by other users on the same machine.
+ /// Figure out some way to transfer environment using stdio?
+ fn translate_env_into_env(self) -> Self {
+ if self.env.is_empty() {
+ return self;
+ }
+ let mut out = Self::new("env");
+ if let Some(session) = self.ssh_session {
+ out = out.ssh_session(session);
+ }
+ for (k, v) in self.env {
+ assert!(!k.contains('='));
+ out.arg(format!("{k}={v}"));
+ }
+ out.arg(self.command);
+ out.args(self.args);
+
+ out
+ }
fn into_string(self) -> String {
let mut out = String::new();
if !self.env.is_empty() {
@@ -98,7 +131,7 @@
}
fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {
Ok(if let Some(session) = self.ssh_session.clone() {
- let cmd = self.into_command();
+ let cmd = self.translate_env_into_env().into_command();
Either::Right(
cmd.over_ssh(session)
.map_err(|e| anyhow!("ssh error: {e}"))?,
@@ -126,6 +159,11 @@
self.arg(value);
self
}
+ pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
+ self.env
+ .push((name.as_ref().to_owned(), value.as_ref().to_owned()));
+ self
+ }
pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
for arg in args.into_iter() {
let arg = arg.as_ref();
@@ -133,9 +171,10 @@
}
self
}
- pub fn sudo(self) -> Self {
+ pub fn sudo(mut self) -> Self {
if std::env::var_os("NO_SUDO").is_some() {
let mut out = Self::new("su");
+ out.ssh_session = self.ssh_session.take();
out.arg("-c").arg(self.into_string());
out
} else {
@@ -144,27 +183,38 @@
out
}
}
- pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {
+ pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
+ self.ssh_session = Some(on);
+ self
+ }
+ pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
let mut out = Self::new("ssh");
+ out.ssh_session = self.ssh_session.take();
out.arg(on).arg("--");
out.arg(self.into_string());
out
}
- pub fn over_ssh(mut self, session: Arc<Session>) -> Self {
- self.ssh_session = Some(session);
- self
- }
pub async fn run(self) -> Result<()> {
let str = self.clone().into_string();
- let cmd = self.into_command();
- run_nix_inner(str, cmd, &mut PlainHandler).await?;
+ let cmd = self.into_command_new()?;
+ match cmd {
+ Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,
+ Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,
+ };
Ok(())
}
pub async fn run_string(self) -> Result<String> {
+ let bytes = self.run_bytes().await?;
+ Ok(String::from_utf8(bytes)?)
+ }
+ pub async fn run_bytes(self) -> Result<Vec<u8>> {
let str = self.clone().into_string();
- let cmd = self.into_command();
- let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;
+ let cmd = self.into_command_new()?;
+ let v = match cmd {
+ Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,
+ Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,
+ };
Ok(v)
}
@@ -172,7 +222,8 @@
let str = self.clone().into_string();
let mut cmd = self.into_command();
cmd.arg("--log-format").arg("internal-json");
- run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await
+ let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;
+ Ok(String::from_utf8(bytes)?)
}
pub async fn run_nix(self) -> Result<()> {
let str = self.clone().into_string();
@@ -198,7 +249,7 @@
str: String,
cmd: Command,
handler: &mut dyn Handler,
-) -> Result<String> {
+) -> Result<Vec<u8>> {
Ok(run_nix_inner_raw(str, cmd, true, handler, None)
.await?
.expect("has out"))
@@ -208,6 +259,24 @@
assert!(v.is_none());
Ok(())
}
+async fn run_nix_inner_stdout_ssh(
+ str: String,
+ cmd: OwningCommand<Arc<Session>>,
+ handler: &mut dyn Handler,
+) -> Result<Vec<u8>> {
+ Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)
+ .await?
+ .expect("has out"))
+}
+async fn run_nix_inner_ssh(
+ str: String,
+ cmd: OwningCommand<Arc<Session>>,
+ handler: &mut dyn Handler,
+) -> Result<()> {
+ let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
+ assert!(v.is_none());
+ Ok(())
+}
pub trait Handler: Send {
fn handle_line(&mut self, e: &str);
@@ -468,7 +537,7 @@
want_stdout: bool,
err_handler: &mut dyn Handler,
mut out_handler: Option<&mut dyn Handler>,
-) -> Result<Option<String>> {
+) -> Result<Option<Vec<u8>>> {
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());
let mut child = cmd.spawn()?;
@@ -522,7 +591,71 @@
}
}
- Ok(out_buf.map(String::from_utf8).transpose()?)
+ Ok(out_buf)
+}
+async fn run_nix_inner_raw_ssh(
+ str: String,
+ mut cmd: OwningCommand<Arc<Session>>,
+ want_stdout: bool,
+ err_handler: &mut dyn Handler,
+ mut out_handler: Option<&mut dyn Handler>,
+) -> Result<Option<Vec<u8>>> {
+ cmd.stderr(openssh::Stdio::piped());
+ cmd.stdout(openssh::Stdio::piped());
+ let mut child = cmd.spawn().await?;
+ let mut stderr = child.stderr().take().unwrap();
+ let stdout = child.stdout().take().unwrap();
+ let mut err = FramedRead::new(&mut stderr, LinesCodec::new());
+ let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));
+ let mut ob = want_stdout
+ .then(|| out.take().unwrap())
+ .unwrap_or_else(|| Box::new(EmptyAsyncRead));
+ let mut ol = (!want_stdout)
+ .then(|| out.take().unwrap())
+ .unwrap_or_else(|| Box::new(EmptyAsyncRead));
+ let mut ob = FramedRead::new(&mut ob, BytesCodec::new());
+ let mut ol = FramedRead::new(&mut ol, LinesCodec::new());
+
+ // while let Some(line) = read.next().await? {}
+
+ let mut out_buf = if want_stdout { Some(vec![]) } else { None };
+
+ let mut wait_future = pin::pin!(child.wait());
+ loop {
+ select! {
+ e = err.next() => {
+ if let Some(e) = e {
+ let e = e?;
+ err_handler.handle_line(&e);
+ }
+ },
+ o = ob.next() => {
+ if let Some(o) = o {
+ out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);
+ }
+ },
+ o = ol.next() => {
+ if let Some(o) = o {
+ let o = o?;
+ if let Some(out) = out_handler.as_mut() {
+ out.handle_line(&o)
+ } else {
+ err_handler.handle_line(&o)
+ }
+ // out_handler.handle_info(&o);
+ }
+ },
+ code = &mut wait_future => {
+ let code = code?;
+ if !code.success() {
+ anyhow::bail!("command '{str}' failed with status {}", code);
+ }
+ break;
+ }
+ }
+ }
+
+ Ok(out_buf)
}
pub trait ErrorRecorder: Send {
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -1,10 +1,10 @@
use std::{
env::current_dir,
- ffi::OsString,
+ ffi::{OsStr, OsString},
io::Write,
ops::Deref,
path::PathBuf,
- sync::{Arc, Mutex, MutexGuard},
+ sync::{Arc, Mutex, MutexGuard, OnceLock},
};
use anyhow::{anyhow, bail, Context, Result};
@@ -46,16 +46,55 @@
pub struct ConfigHost {
pub name: String,
+ pub session: OnceLock<Arc<openssh::Session>>,
}
impl ConfigHost {
- async fn open_session(&self) -> Result<openssh::Session> {
- let mut session = SessionBuilder::default();
+ pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ // FIXME: TOCTOU
+ if let Some(session) = &self.session.get() {
+ return Ok((*session).clone());
+ };
+ let session = SessionBuilder::default();
- session
+ let session = session
.connect(&self.name)
.await
- .map_err(|e| anyhow!("ssh error: {e}"))
+ .map_err(|e| anyhow!("ssh error: {e}"))?;
+ let session = Arc::new(session);
+ self.session.set(session.clone()).expect("TOCTOU happened");
+ Ok(session)
+ }
+ pub async fn mktemp_dir(&self) -> Result<String> {
+ let mut cmd = self.cmd("mktemp").await?;
+ cmd.arg("-d");
+ let path = cmd.run_string().await?;
+ Ok(path.trim_end().to_owned())
}
+ pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
+ let mut cmd = self.cmd("cat").await?;
+ cmd.arg(path);
+ cmd.run_bytes().await
+ }
+ pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {
+ let mut cmd = self.cmd("cat").await?;
+ cmd.arg(path);
+ cmd.run_string().await
+ }
+ pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(cmd, session))
+ }
+
+ pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
+ }
}
impl Config {
@@ -96,12 +135,21 @@
command.run_string().await
}
+ pub async fn host(&self, name: &str) -> Result<ConfigHost> {
+ Ok(ConfigHost {
+ name: name.to_owned(),
+ session: OnceLock::new(),
+ })
+ }
pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
let fleet_field = &self.fleet_field;
let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;
let mut out = vec![];
for name in names {
- out.push(ConfigHost { name })
+ out.push(ConfigHost {
+ name,
+ session: OnceLock::new(),
+ })
}
Ok(out)
}
@@ -152,19 +200,6 @@
host_secrets.insert(secret, value);
}
- pub async fn decrypt_on_host(&self, host: &str, data: Vec<u8>) -> Result<Vec<u8>> {
- let data = z85::encode(&data);
- let mut cmd = MyCommand::new("fleet-install-secrets");
- cmd.arg("decrypt").eqarg("--secret", data);
- cmd = cmd.sudo().ssh(host);
- let encoded = cmd
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?
- .trim()
- .to_owned();
- z85::decode(encoded).context("bad encoded data? outdated host?")
- }
pub async fn reencrypt_on_host(
&self,
host: &str,
nixos/meta.nixdiffbeforeafterboth--- a/nixos/meta.nix
+++ b/nixos/meta.nix
@@ -1,11 +1,18 @@
-{ lib, ... }:
-with lib;
{
+ lib,
+ pkgs,
+ ...
+}:
+with lib; {
options = with types; {
+ nixpkgs.resolvedPkgs = mkOption {
+ type = types.pkgs // {description = "nixpkgs.pkgs";};
+ description = "Value of pkgs";
+ };
tags = mkOption {
type = listOf str;
description = "Host tags";
- default = [ ];
+ default = [];
};
network = mkOption {
type = submodule {
@@ -13,12 +20,12 @@
internalIps = mkOption {
type = listOf str;
description = "Internal ips";
- default = [ ];
+ default = [];
};
externalIps = mkOption {
type = listOf str;
description = "External ips";
- default = [ ];
+ default = [];
};
};
};
@@ -29,7 +36,8 @@
};
};
config = {
- tags = [ "all" ];
- network = { };
+ tags = ["all"];
+ network = {};
+ nixpkgs.resolvedPkgs = pkgs;
};
}