difftreelog
refactor minor rewrites
in: trunk
7 files changed
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -247,7 +247,7 @@
Ok(())
}
async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {
- if tracing::enabled!(Level::DEBUG) {
+ if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {
let cmd_str = String::from_utf8_lossy(cmd.as_ref());
tracing::debug!("{cmd_str}");
};
@@ -627,13 +627,6 @@
}
pub async fn field(session: NixSession, field: &str) -> Result<Self> {
Self::root(session).select([Index::var(field)]).await
- }
- pub async fn get_json_deep<'a, V: DeserializeOwned>(
- &self,
- name: impl IntoIterator<Item = Index>,
- ) -> Result<V> {
- let field = self.select(name).await?;
- field.as_json().await
}
pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {
let mut used_fields = Vec::new();
@@ -719,6 +712,19 @@
.await
.with_context(|| context(self.0.full_path.as_deref(), &query))
}
+ pub async fn has_field(&self, name: &str) -> Result<bool> {
+ let id = self.0.value.expect("can't list root fields");
+ let key = nixlike::escape_string(name);
+ let query = format!("sess_field_{id} ? {key}");
+ self.0
+ .session
+ .0
+ .lock()
+ .await
+ .execute_expression_to_json(&query)
+ .await
+ .with_context(|| context(self.0.full_path.as_deref(), &query))
+ }
pub async fn list_fields(&self) -> Result<Vec<String>> {
let id = self.0.value.expect("can't list root fields");
let query = format!("builtins.attrNames sess_field_{id}");
@@ -731,6 +737,18 @@
.await
.with_context(|| context(self.0.full_path.as_deref(), &query))
}
+ pub async fn type_of(&self) -> Result<String> {
+ let id = self.0.value.expect("can't list root fields");
+ let query = format!("builtins.typeOf sess_field_{id}");
+ self.0
+ .session
+ .0
+ .lock()
+ .await
+ .execute_expression_to_json(&query)
+ .await
+ .with_context(|| context(self.0.full_path.as_deref(), &query))
+ }
pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {
let id = self.0.value.expect("can't use build on not-value");
let query = format!(":b sess_field_{id}");
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,6 +1,6 @@
use crate::{
- command::MyCommand,
- fleetdata::{FleetSecret, FleetSharedSecret},
+ better_nix_eval::Field,
+ fleetdata::{FleetSecret, FleetSharedSecret, SecretData},
host::Config,
nix_go, nix_go_json,
};
@@ -8,16 +8,16 @@
use chrono::{DateTime, Utc};
use clap::Parser;
use futures::{StreamExt, TryStreamExt};
+use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
collections::HashSet,
io::{self, Cursor, Read},
path::PathBuf,
- sync::Arc,
};
use tabled::{Table, Tabled};
use tokio::fs::read_to_string;
-use tracing::{error, info, info_span, warn};
+use tracing::{info, info_span, warn};
#[derive(Parser)]
pub enum Secret {
@@ -90,82 +90,153 @@
prefer_identities: Vec<String>,
},
List {},
- InvokeGenerator,
}
-impl Secret {
- pub async fn run(self, config: &Config) -> Result<()> {
- match self {
- Secret::InvokeGenerator => {
- let config_field = &config.config_unchecked_field;
+async fn generate_shared(
+ config: &Config,
+ display_name: &str,
+ secret: Field,
+) -> Result<FleetSharedSecret> {
+ Ok(if secret.has_field("generateImpure").await? {
+ let config_field = &config.config_unchecked_field;
+ let generate = nix_go!(secret.generateImpure);
+ let owners: Vec<String> = nix_go_json!(secret.expectedOwners);
- let secret =
- nix_go!(config_field.configUnchecked.sharedSecrets["kube-apiserver.pem"]);
- let generate_impure = nix_go!(secret.generateImpure);
- let on = nix_go!(generate_impure.on);
- let call_package = nix_go!(
- config_field.buildableSystems(Obj {
- localSystem: { config.local_system.clone() }
- })[on]
- .config
- .nixpkgs
- .resolvedPkgs
- .callPackage
- );
- let generator = nix_go!(call_package(generate_impure.generator)(Obj {}));
- let built = &generator.build().await?["out"];
- let mut nix = MyCommand::new("nix");
- let on: String = on.as_json().await?;
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{on}"))
- .arg(built);
- nix.run_nix().await?;
+ let on: String = nix_go_json!(generate.on);
+ let call_package = nix_go!(
+ config_field.buildableSystems(Obj {
+ localSystem: { config.local_system.clone() }
+ })[{ on }]
+ .config
+ .nixpkgs
+ .resolvedPkgs
+ .callPackage
+ );
- let session = config.host(&on).await?;
+ let host = config.host(&on).await?;
- let owners: Vec<String> = nix_go_json!(secret.expectedOwners);
- dbg!(&owners);
+ let generator = nix_go!(call_package(generate.generator)(Obj {}));
+ let generator = generator.build().await?;
+ let generator = generator
+ .get("out")
+ .ok_or_else(|| anyhow!("missing generateImpure out"))?;
+ let generator = host.remote_derivation(generator).await?;
- let mut recipients = String::new();
- for owner in owners {
- let key = config.key(&owner).await?;
- recipients.push_str(&format!("-r \"{key}\" "));
- }
- recipients.push_str("-e");
+ let mut recipients = String::new();
+ for owner in &owners {
+ let key = config.key(owner).await?;
+ recipients.push_str(&format!("-r \"{key}\" "));
+ }
+ recipients.push_str("-e");
- // FIXME: security: created directory might be accessible to other users
- // This shouldn't be much of a concern, as data is encrypted right after creation, yet
- // still better to have.
- let tempdir = session.mktemp_dir().await?;
+ let out = host.mktemp_dir().await?;
- let mut gen = session.cmd(built).await?;
- gen.env("rageArgs", recipients).env("out", &tempdir);
- gen.run().await?;
+ let mut gen = host.cmd(generator).await?;
+ gen.env("rageArgs", recipients).env("out", &out);
+ gen.run().await?;
- {
- let marker = session.read_file_text(format!("{tempdir}/marker")).await?;
- ensure!(marker == "SUCCESS", "generation not succeeded");
- }
+ {
+ let marker = host.read_file_text(format!("{out}/marker")).await?;
+ ensure!(marker == "SUCCESS", "generation not succeeded");
+ }
- let public = session
- .read_file_bin(format!("{tempdir}/public"))
- .await
- .ok();
- let secret = session
- .read_file_bin(format!("{tempdir}/secret"))
- .await
- .ok();
- if let Some(secret) = &secret {
- ensure!(
- age::Decryptor::new(Cursor::new(&secret)).is_ok(),
- "builder produced non-encrypted value as secret, this is highly insecure"
- );
- }
- dbg!(&secret);
- // // .as_json().await?;
- // dbg!(&built);
- }
+ let public = host.read_file_text(format!("{out}/public")).await.ok();
+ let secret = host.read_file_bin(format!("{out}/secret")).await.ok();
+ if let Some(secret) = &secret {
+ ensure!(
+ age::Decryptor::new(Cursor::new(&secret)).is_ok(),
+ "builder produced non-encrypted value as secret, this is highly insecure"
+ );
+ }
+
+ let created_at = host.read_file_value(format!("{out}/created_at")).await?;
+ let expires_at = host.read_file_value(format!("{out}/expires_at")).await.ok();
+
+ FleetSharedSecret {
+ owners,
+ secret: FleetSecret {
+ created_at,
+ expires_at,
+ public,
+ secret: secret.map(SecretData),
+ },
+ }
+ } else {
+ bail!("no generator defined for {display_name}")
+ })
+}
+
+async fn parse_public(
+ public: Option<String>,
+ public_file: Option<PathBuf>,
+) -> Result<Option<String>> {
+ Ok(match (public, public_file) {
+ (Some(v), None) => Some(v),
+ (None, Some(v)) => Some(read_to_string(v).await?),
+ (Some(_), Some(_)) => {
+ bail!("only public or public_file should be set")
+ }
+ (None, None) => None,
+ })
+}
+
+fn parse_machines(
+ initial: Vec<String>,
+ machines: Option<Vec<String>>,
+ mut add_machines: Vec<String>,
+ mut remove_machines: Vec<String>,
+) -> Result<Vec<String>> {
+ if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {
+ bail!("no operation");
+ }
+
+ let initial_machines = initial.clone();
+ let mut target_machines = initial;
+ info!("Currently encrypted for {initial_machines:?}");
+
+ // ensure!(machines.is_some() || !add_machines.is_empty() || )
+ if let Some(machines) = machines {
+ ensure!(
+ add_machines.is_empty() && remove_machines.is_empty(),
+ "can't combine --machines and --add-machines/--remove-machines"
+ );
+ let target = initial_machines.iter().collect::<HashSet<_>>();
+ let source = machines.iter().collect::<HashSet<_>>();
+ for removed in target.difference(&source) {
+ remove_machines.push((*removed).clone());
+ }
+ for added in source.difference(&target) {
+ add_machines.push((*added).clone());
+ }
+ }
+
+ for machine in &remove_machines {
+ let mut removed = false;
+ while let Some(pos) = target_machines.iter().position(|m| m == machine) {
+ target_machines.swap_remove(pos);
+ removed = true;
+ }
+ if !removed {
+ warn!("secret is not enabled for {machine}");
+ }
+ }
+ for machine in &add_machines {
+ if target_machines.iter().any(|m| m == machine) {
+ warn!("secret is already added to {machine}");
+ } else {
+ target_machines.push(machine.to_owned());
+ }
+ }
+ if !remove_machines.is_empty() {
+ // TODO: maybe force secret regeneration?
+ // Not that useful without revokation.
+ warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");
+ }
+ Ok(target_machines)
+}
+impl Secret {
+ pub async fn run(self, config: &Config) -> Result<()> {
+ match self {
Secret::ForceKeys => {
for host in config.list_hosts().await? {
if config.should_skip(&host.name) {
@@ -199,9 +270,8 @@
machines = shared.owners;
}
- let recipients = futures::stream::iter(machines.iter())
- .then(|m| config.recipient(m))
- .try_collect::<Vec<_>>()
+ let recipients = config
+ .recipients(&machines.iter().map(String::as_str).collect_vec())
.await?;
let secret = {
@@ -209,22 +279,15 @@
io::stdin().read_to_end(&mut input)?;
if input.is_empty() {
- input
+ None
} else {
- let mut encrypted = vec![];
- let recipients = recipients
- .iter()
- .cloned()
- .map(|r| Box::new(r) as Box<dyn age::Recipient + Send>)
- .collect();
- let mut encryptor = age::Encryptor::with_recipients(recipients)
- .ok_or_else(|| anyhow!("no recipients provided"))?
- .wrap_output(&mut encrypted)?;
- io::copy(&mut Cursor::new(input), &mut encryptor)?;
- encryptor.finish()?;
- encrypted
+ Some(
+ SecretData::encrypt(recipients, input)
+ .ok_or_else(|| anyhow!("no recipients provided"))?,
+ )
}
};
+ let public = parse_public(public, public_file).await?;
config.replace_shared(
name,
FleetSharedSecret {
@@ -233,14 +296,7 @@
created_at: Utc::now(),
expires_at,
secret,
- public: match (public, public_file) {
- (Some(v), None) => Some(v),
- (None, Some(v)) => Some(read_to_string(v).await?),
- (Some(_), Some(_)) => {
- bail!("only public or public_file should be set")
- }
- (None, None) => None,
- },
+ public,
},
},
);
@@ -261,19 +317,14 @@
bail!("no data provided")
}
- let mut encrypted = vec![];
- let recipient = Box::new(recipient) as Box<dyn age::Recipient + Send>;
- let mut encryptor = age::Encryptor::with_recipients(vec![recipient])
- .expect("recipients provided")
- .wrap_output(&mut encrypted)?;
- io::copy(&mut Cursor::new(input), &mut encryptor)?;
- encryptor.finish()?;
- encrypted
+ Some(SecretData::encrypt(vec![recipient], input).expect("recipient provided"))
};
if config.has_secret(&machine, &name) && !force {
bail!("secret already defined");
}
+ let public = parse_public(public, public_file).await?;
+
config.insert_secret(
&machine,
name,
@@ -281,16 +332,10 @@
created_at: Utc::now(),
expires_at: None,
secret,
- public: match (public, public_file) {
- (Some(v), None) => Some(v),
- (None, Some(v)) => Some(std::fs::read_to_string(v)?),
- (Some(_), Some(_)) => bail!("only public or public_file should be set"),
- (None, None) => None,
- },
+ public,
},
);
}
- // TODO: Instead of using sudo, decode secret on remote machine
#[allow(clippy::await_holding_refcell_ref)]
Secret::Read {
name,
@@ -298,11 +343,11 @@
plaintext,
} => {
let secret = config.host_secret(&machine, &name)?;
- if secret.secret.is_empty() {
+ let Some(secret) = secret.secret else {
bail!("no secret {name}");
- }
+ };
let host = config.host(&machine).await?;
- let data = host.decrypt(secret.secret).await?;
+ let data = host.decrypt(secret).await?;
if plaintext {
let s = String::from_utf8(data).context("output is not utf8")?;
print!("{s}");
@@ -313,59 +358,22 @@
Secret::UpdateShared {
name,
machines,
- mut add_machines,
- mut remove_machines,
+ add_machines,
+ remove_machines,
prefer_identities,
} => {
- if machines.is_none() && add_machines.is_empty() && remove_machines.is_empty() {
- bail!("no operation");
- }
-
let mut secret = config.shared_secret(&name)?;
- if secret.secret.secret.is_empty() {
+ if secret.secret.secret.is_none() {
bail!("no secret");
}
let initial_machines = secret.owners.clone();
- let mut target_machines = secret.owners.clone();
- info!("Currently encrypted for {initial_machines:?}");
-
- // ensure!(machines.is_some() || !add_machines.is_empty() || )
- if let Some(machines) = machines {
- ensure!(
- add_machines.is_empty() && remove_machines.is_empty(),
- "can't combine --machines and --add-machines/--remove-machines"
- );
- let target = initial_machines.iter().collect::<HashSet<_>>();
- let source = machines.iter().collect::<HashSet<_>>();
- for removed in target.difference(&source) {
- remove_machines.push((*removed).clone());
- }
- for added in source.difference(&target) {
- add_machines.push((*added).clone());
- }
- }
-
- for machine in &remove_machines {
- let mut removed = false;
- while let Some(pos) = target_machines.iter().position(|m| m == machine) {
- target_machines.swap_remove(pos);
- removed = true;
- }
- if !removed {
- warn!("secret is not enabled for {machine}");
- }
- }
- for machine in &add_machines {
- if target_machines.iter().any(|m| m == machine) {
- warn!("secret is already added to {machine}");
- } else {
- target_machines.push(machine.to_owned());
- }
- }
- if !remove_machines.is_empty() {
- warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");
- }
+ let target_machines = parse_machines(
+ initial_machines.clone(),
+ machines,
+ add_machines,
+ remove_machines,
+ )?;
if target_machines.is_empty() {
info!("no machines left for secret, removing it");
@@ -395,12 +403,14 @@
let target_recipients =
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret.secret.secret, target_recipients)
- .await?;
+ if let Some(data) = secret.secret.secret {
+ let encrypted = config
+ .reencrypt_on_host(identity_holder, data, target_recipients)
+ .await?;
+ secret.secret.secret = Some(encrypted);
+ }
secret.owners = target_machines;
- secret.secret.secret = encrypted;
config.replace_shared(name, secret);
}
Secret::Regenerate { prefer_identities } => {
@@ -412,14 +422,20 @@
.collect::<HashSet<_>>();
let shared_set = config.list_shared().into_iter().collect::<HashSet<_>>();
for removed in expected_shared_set.difference(&shared_set) {
- error!("secret needs to be generated: {removed}")
+ info!("generating secret: {removed}");
+ let config_field = &config.config_unchecked_field;
+ let config_field = nix_go!(config_field.configUnchecked);
+ let secret = nix_go!(config_field.sharedSecrets[{ removed }]);
+ let shared = generate_shared(config, removed, secret).await?;
+ config.replace_shared(removed.to_string(), shared)
}
}
let mut to_remove = Vec::new();
for name in &config.list_shared() {
info!("updating secret: {name}");
let mut data = config.shared_secret(name)?;
- let config_field = &config.config_field;
+ let config_field = &config.config_unchecked_field;
+ let config_field = nix_go!(config_field.configUnchecked);
let expected_owners: Vec<String> =
nix_go_json!(config_field.sharedSecrets[{ name }].expectedOwners);
if expected_owners.is_empty() {
@@ -430,50 +446,52 @@
let set = data.owners.iter().collect::<HashSet<_>>();
let expected_set = expected_owners.iter().collect::<HashSet<_>>();
let should_remove = set.difference(&expected_set).next().is_some();
- if set != expected_set {
- let owner_dependent: bool =
- nix_go_json!(config_field.sharedSecrets[{ name }].ownerDependent);
- if !owner_dependent {
- warn!("reencrypting secret '{name}' for new owner set");
- // TODO: force regeneration
- if should_remove {
- warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");
- }
+ if set == expected_set {
+ info!("secret data is ok");
+ continue;
+ }
+
+ let secret = nix_go!(config_field.sharedSecrets[{ name }]);
+ let owner_dependent: bool = nix_go_json!(secret.ownerDependent);
+ let regenerate_on_remove: bool = nix_go_json!(secret.regenerateOnOwnerRemoved);
+ #[allow(clippy::nonminimal_bool)]
+ if !owner_dependent && !(should_remove && regenerate_on_remove) {
+ warn!("reencrypting secret '{name}' for new owner set");
+ // TODO: force regeneration
+ if should_remove {
+ warn!("secret will not be regenerated for removed machines, and until host rebuild, they will still possess the ability to decode secret");
+ }
- let identity_holder = if !prefer_identities.is_empty() {
- prefer_identities
- .iter()
- .find(|i| data.owners.iter().any(|s| s == *i))
- } else {
- data.owners.first()
- };
- let Some(identity_holder) = identity_holder else {
- bail!("no available holder found");
- };
+ let identity_holder = if !prefer_identities.is_empty() {
+ prefer_identities
+ .iter()
+ .find(|i| data.owners.iter().any(|s| s == *i))
+ } else {
+ data.owners.first()
+ };
+ let Some(identity_holder) = identity_holder else {
+ bail!("no available holder found");
+ };
- let target_recipients = futures::stream::iter(&expected_owners)
- .then(|m| async { config.key(m).await })
- .collect::<Vec<_>>()
- .await;
- let target_recipients =
- target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
+ let target_recipients = futures::stream::iter(&expected_owners)
+ .then(|m| async { config.key(m).await })
+ .collect::<Vec<_>>()
+ .await;
+ let target_recipients =
+ target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
+ if let Some(secret) = data.secret.secret {
let encrypted = config
- .reencrypt_on_host(
- identity_holder,
- data.secret.secret,
- target_recipients,
- )
+ .reencrypt_on_host(identity_holder, secret, target_recipients)
.await?;
- data.secret.secret = encrypted;
- data.owners = expected_owners;
- config.replace_shared(name.to_owned(), data);
- } else {
- error!("secret '{name}' should be regenerated manually");
+ data.secret.secret = Some(encrypted);
}
+ data.owners = expected_owners;
+ config.replace_shared(name.to_owned(), data);
} else {
- info!("secret data is ok")
+ let shared = generate_shared(config, name, secret).await?;
+ config.replace_shared(name.to_owned(), shared)
}
}
for k in to_remove {
cmds/fleet/src/fleetdata.rsdiffbeforeafterboth--- a/cmds/fleet/src/fleetdata.rs
+++ b/cmds/fleet/src/fleetdata.rs
@@ -1,8 +1,13 @@
+use age::Recipient;
use anyhow::Result;
use chrono::{DateTime, Utc};
+use itertools::Itertools;
use nixlike::format_nix;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use std::collections::BTreeMap;
+use std::{
+ collections::BTreeMap,
+ io::{self, Cursor},
+};
use tempfile::TempDir;
use tokio::{
fs::{self, File},
@@ -41,6 +46,43 @@
}
#[derive(Serialize, Deserialize, Clone)]
+pub struct SecretData(
+ #[serde(
+ default,
+ skip_serializing_if = "Vec::is_empty",
+ serialize_with = "as_z85",
+ deserialize_with = "from_z85"
+ )]
+ pub Vec<u8>,
+);
+impl SecretData {
+ /// Returns None if recipients.is_empty()
+ pub fn encrypt(
+ recipients: impl IntoIterator<Item = impl Recipient + Send + 'static>,
+ data: Vec<u8>,
+ ) -> Option<Self> {
+ let mut encrypted = vec![];
+ let recipients = recipients
+ .into_iter()
+ .map(|v| Box::new(v) as Box<dyn Recipient + Send>)
+ .collect_vec();
+ let mut encryptor = age::Encryptor::with_recipients(recipients)?
+ .wrap_output(&mut encrypted)
+ .expect("in memory write");
+ io::copy(&mut Cursor::new(data), &mut encryptor).expect("in memory copy");
+ encryptor.finish().expect("in memory flush");
+ Some(Self(encrypted))
+ }
+ pub fn encode_z85(&self) -> String {
+ z85::encode(&self.0)
+ }
+ pub fn decode_z85(v: &str) -> Result<Self> {
+ let v = z85::decode(v)?;
+ Ok(Self(v))
+ }
+}
+
+#[derive(Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
#[must_use]
pub struct FleetSecret {
@@ -51,13 +93,8 @@
pub expires_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub public: Option<String>,
- #[serde(
- default,
- skip_serializing_if = "Vec::is_empty",
- serialize_with = "as_z85",
- deserialize_with = "from_z85"
- )]
- pub secret: Vec<u8>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub secret: Option<SecretData>,
}
fn as_z85<S>(key: &[u8], serializer: S) -> Result<S::Ok, S::Error>
cmds/fleet/src/host.rsdiffbeforeafterboth1use std::{2 env::current_dir,3 ffi::{OsStr, OsString},4 io::Write,5 ops::Deref,6 path::PathBuf,7 sync::{Arc, Mutex, MutexGuard, OnceLock},8};910use anyhow::{anyhow, bail, Context, Result};11use clap::{ArgGroup, Parser};12use openssh::SessionBuilder;13use tempfile::NamedTempFile;1415use crate::{16 better_nix_eval::{Field, NixSessionPool},17 command::MyCommand,18 fleetdata::{FleetData, FleetSecret, FleetSharedSecret},19 nix_go, nix_go_json,20};2122pub struct FleetConfigInternals {23 pub local_system: String,24 pub directory: PathBuf,25 pub opts: FleetOpts,26 pub data: Mutex<FleetData>,27 pub nix_args: Vec<OsString>,28 /// fleetConfigurations.<name>.<localSystem>29 pub fleet_field: Field,30 /// fleet_config.configUnchecked31 pub config_field: Field,32 /// fleet_config.unchecked33 pub config_unchecked_field: Field,34}3536#[derive(Clone)]37pub struct Config(Arc<FleetConfigInternals>);3839impl Deref for Config {40 type Target = FleetConfigInternals;4142 fn deref(&self) -> &Self::Target {43 &self.044 }45}4647pub struct ConfigHost {48 pub name: String,49 pub session: OnceLock<Arc<openssh::Session>>,50}51impl ConfigHost {52 pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {53 // FIXME: TOCTOU54 if let Some(session) = &self.session.get() {55 return Ok((*session).clone());56 };57 let session = SessionBuilder::default();5859 let session = session60 .connect(&self.name)61 .await62 .map_err(|e| anyhow!("ssh error: {e}"))?;63 let session = Arc::new(session);64 self.session.set(session.clone()).expect("TOCTOU happened");65 Ok(session)66 }67 pub async fn mktemp_dir(&self) -> Result<String> {68 let mut cmd = self.cmd("mktemp").await?;69 cmd.arg("-d");70 let path = cmd.run_string().await?;71 Ok(path.trim_end().to_owned())72 }73 pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {74 let mut cmd = self.cmd("cat").await?;75 cmd.arg(path);76 cmd.run_bytes().await77 }78 pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {79 let mut cmd = self.cmd("cat").await?;80 cmd.arg(path);81 cmd.run_string().await82 }83 pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {84 let session = self.open_session().await?;85 Ok(MyCommand::new_on(cmd, session))86 }8788 pub async fn decrypt(&self, data: Vec<u8>) -> Result<Vec<u8>> {89 let mut cmd = self.cmd("fleet-install-secrets").await?;90 cmd.arg("decrypt").eqarg("--secret", z85::encode(&data));91 let encoded = cmd92 .sudo()93 .run_string()94 .await95 .context("failed to call remote host for decrypt")?;96 z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")97 }98}99100impl Config {101 pub fn should_skip(&self, host: &str) -> bool {102 if !self.opts.skip.is_empty() {103 self.opts.skip.iter().any(|h| h as &str == host)104 } else if !self.opts.only.is_empty() {105 !self.opts.only.iter().any(|h| h as &str == host)106 } else {107 false108 }109 }110 pub fn is_local(&self, host: &str) -> bool {111 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)112 }113114 pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {115 if sudo {116 command = command.sudo();117 }118 if !self.is_local(host) {119 command = command.ssh(host);120 }121 command.run().await122 }123 pub async fn run_string_on(124 &self,125 host: &str,126 mut command: MyCommand,127 sudo: bool,128 ) -> Result<String> {129 if sudo {130 command = command.sudo();131 }132 if !self.is_local(host) {133 command = command.ssh(host);134 }135 command.run_string().await136 }137138 pub async fn host(&self, name: &str) -> Result<ConfigHost> {139 Ok(ConfigHost {140 name: name.to_owned(),141 session: OnceLock::new(),142 })143 }144 pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {145 let fleet_field = &self.fleet_field;146 let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;147 let mut out = vec![];148 for name in names {149 out.push(ConfigHost {150 name,151 session: OnceLock::new(),152 })153 }154 Ok(out)155 }156 pub async fn system_config(&self, host: &str) -> Result<Field> {157 let fleet_field = &self.fleet_field;158 Ok(nix_go!(fleet_field.configuredSystems[{ host }].config))159 }160161 pub(super) fn data(&self) -> MutexGuard<FleetData> {162 self.data.lock().unwrap()163 }164 pub(super) fn data_mut(&self) -> MutexGuard<FleetData> {165 self.data.lock().unwrap()166 }167 /// Shared secrets configured in fleet.nix or in flake168 pub async fn list_configured_shared(&self) -> Result<Vec<String>> {169 let config_field = &self.config_field;170 nix_go!(config_field.sharedSecrets).list_fields().await171 }172 /// Shared secrets configured in fleet.nix173 pub fn list_shared(&self) -> Vec<String> {174 let data = self.data();175 data.shared_secrets.keys().cloned().collect()176 }177 pub fn has_shared(&self, name: &str) -> bool {178 let data = self.data();179 data.shared_secrets.contains_key(name)180 }181 pub fn replace_shared(&self, name: String, shared: FleetSharedSecret) {182 let mut data = self.data_mut();183 data.shared_secrets.insert(name.to_owned(), shared);184 }185 pub fn remove_shared(&self, secret: &str) {186 let mut data = self.data_mut();187 data.shared_secrets.remove(secret);188 }189190 pub fn has_secret(&self, host: &str, secret: &str) -> bool {191 let data = self.data();192 let Some(host_secrets) = data.host_secrets.get(host) else {193 return false;194 };195 host_secrets.contains_key(secret)196 }197 pub fn insert_secret(&self, host: &str, secret: String, value: FleetSecret) {198 let mut data = self.data_mut();199 let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();200 host_secrets.insert(secret, value);201 }202203 pub async fn reencrypt_on_host(204 &self,205 host: &str,206 data: Vec<u8>,207 targets: Vec<String>,208 ) -> Result<Vec<u8>> {209 let data = z85::encode(&data);210 let mut recmd = MyCommand::new("fleet-install-secrets");211 recmd.arg("reencrypt").eqarg("--secret", data);212 for target in targets {213 recmd.eqarg("--targets", target);214 }215 recmd = recmd.sudo().ssh(host);216 let encoded = recmd217 .run_string()218 .await219 .context("failed to call remote host for decrypt")?220 .trim()221 .to_owned();222 z85::decode(encoded).context("bad encoded data? outdated host?")223 }224225 pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {226 let data = self.data();227 let Some(host_secrets) = data.host_secrets.get(host) else {228 bail!("no secrets for machine {host}");229 };230 let Some(secret) = host_secrets.get(secret) else {231 bail!("machine {host} has no secret {secret}");232 };233 Ok(secret.clone())234 }235 pub fn shared_secret(&self, secret: &str) -> Result<FleetSharedSecret> {236 let data = self.data();237 let Some(secret) = data.shared_secrets.get(secret) else {238 bail!("no shared secret {secret}");239 };240 Ok(secret.clone())241 }242 pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {243 let config_field = &self.config_field;244 Ok(nix_go_json!(245 config_field.sharedSecrets[{ secret }].expectedOwners246 ))247 }248249 pub fn save(&self) -> Result<()> {250 let mut tempfile = NamedTempFile::new_in(self.directory.clone())?;251 let data = nixlike::serialize(&self.data() as &FleetData)?;252 tempfile.write_all(253 format!(254 "# This file contains fleet state and shouldn't be edited by hand\n\n{}\n\n# vim: ts=2 et nowrap\n",255 data256 )257 .as_bytes(),258 )?;259 let mut fleet_data_path = self.directory.clone();260 fleet_data_path.push("fleet.nix");261 tempfile.persist(fleet_data_path)?;262 Ok(())263 }264}265266#[derive(Parser, Clone)]267#[clap(group = ArgGroup::new("target_hosts"))]268pub struct FleetOpts {269 /// All hosts except those would be skipped270 #[clap(long, number_of_values = 1, group = "target_hosts")]271 only: Vec<String>,272273 /// Hosts to skip274 #[clap(long, number_of_values = 1, group = "target_hosts")]275 skip: Vec<String>,276277 /// Host, which should be threaten as current machine278 #[clap(long)]279 pub localhost: Option<String>,280281 /// Override detected system for host, to perform builds via282 /// binfmt-declared qemu instead of trying to crosscompile283 #[clap(long, default_value = "detect")]284 pub local_system: String,285}286287impl FleetOpts {288 pub async fn build(mut self, nix_args: Vec<OsString>) -> Result<Config> {289 if self.localhost.is_none() {290 self.localhost291 .replace(hostname::get().unwrap().to_str().unwrap().to_owned());292 }293 let directory = current_dir()?;294295 let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;296 let root_field = pool.get().await?;297298 if self.local_system == "detect" {299 let builtins_field = Field::field(root_field.clone(), "builtins").await?;300 self.local_system = nix_go_json!(builtins_field.currentSystem);301 }302 let local_system = self.local_system.clone();303304 let fleet_root = Field::field(root_field, "fleetConfigurations").await?;305306 let fleet_field = nix_go!(fleet_root.default);307 let config_field = nix_go!(fleet_field.configUnchecked);308 let config_unchecked_field = nix_go!(fleet_field.unchecked);309310 let mut fleet_data_path = directory.clone();311 fleet_data_path.push("fleet.nix");312 let bytes = std::fs::read_to_string(fleet_data_path)?;313 let data = nixlike::parse_str(&bytes)?;314315 Ok(Config(Arc::new(FleetConfigInternals {316 opts: self,317 directory,318 data,319 local_system,320 nix_args,321 fleet_field,322 config_field,323 config_unchecked_field,324 })))325 }326}cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -2,7 +2,9 @@
use crate::command::MyCommand;
use crate::host::Config;
+use age::Recipient;
use anyhow::{anyhow, Result};
+use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use tracing::warn;
@@ -36,11 +38,18 @@
}
}
/// Insecure, requires root
- pub async fn recipient(&self, host: &str) -> anyhow::Result<age::ssh::Recipient> {
+ pub async fn recipient(&self, host: &str) -> anyhow::Result<impl Recipient> {
let key = self.key(host).await?;
age::ssh::Recipient::from_str(&key).map_err(|e| anyhow!("parse recipient error: {:?}", e))
}
+ pub async fn recipients(&self, hosts: &[&str]) -> Result<Vec<impl Recipient>> {
+ futures::stream::iter(hosts.iter())
+ .then(|m| self.recipient(m))
+ .try_collect::<Vec<_>>()
+ .await
+ }
+
#[allow(dead_code)]
pub async fn orphaned_data(&self) -> Result<Vec<String>> {
let mut out = Vec::new();
crates/nixlike/src/lib.rsdiffbeforeafterboth--- a/crates/nixlike/src/lib.rs
+++ b/crates/nixlike/src/lib.rs
@@ -10,6 +10,8 @@
mod se_impl;
mod to_string;
+pub use to_string::escape_string;
+
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("bad number")]
crates/nixlike/src/to_string.rsdiffbeforeafterboth--- a/crates/nixlike/src/to_string.rs
+++ b/crates/nixlike/src/to_string.rs
@@ -25,8 +25,8 @@
}
}
-fn write_nix_str(str: &str, out: &mut String) {
- out.push_str(&format!(
+pub fn escape_string(str: &str) -> String {
+ format!(
"\"{}\"",
str.replace('\\', "\\\\")
.replace('"', "\\\"")
@@ -34,7 +34,11 @@
.replace('\t', "\\t")
.replace('\r', "\\r")
.replace('$', "\\$")
- ))
+ )
+}
+
+pub fn write_nix_str(str: &str, out: &mut String) {
+ out.push_str(&escape_string(str))
}
fn write_nix_buf(value: &Value, out: &mut String) {