1use std::{2 cell::{LazyCell, OnceCell},3 collections::BTreeMap,4 env::current_dir,5 ffi::{OsStr, OsString},6 fmt::Display,7 io::Write,8 ops::Deref,9 path::PathBuf,10 str::FromStr,11 sync::{Arc, Mutex, MutexGuard, OnceLock},12};1314use anyhow::{anyhow, bail, ensure, Context, Result};15use clap::Parser;16use fleet_shared::SecretData;17use nix_eval::{nix_go, nix_go_json, util::assert_warn, NixSessionPool, Value};18use nom::{19 bytes::complete::take_while1,20 character::complete::char,21 combinator::{map, opt},22 multi::separated_list1,23 sequence::{preceded, separated_pair},24};25use openssh::SessionBuilder;26use serde::de::DeserializeOwned;27use tempfile::NamedTempFile;28use tracing::error;2930use crate::{31 command::MyCommand,32 fleetdata::{FleetData, FleetSecret, FleetSharedSecret},33};3435pub struct FleetConfigInternals {36 pub local_system: String,37 pub directory: PathBuf,38 pub opts: FleetOpts,39 pub data: Mutex<FleetData>,40 pub nix_args: Vec<OsString>,41 42 pub config_field: Value,4344 45 pub default_pkgs: Value,46}4748#[derive(Clone)]49pub struct Config(Arc<FleetConfigInternals>);5051impl Deref for Config {52 type Target = FleetConfigInternals;5354 fn deref(&self) -> &Self::Target {55 &self.056 }57}5859#[derive(Clone, Copy, Debug)]60pub enum EscalationStrategy {61 Sudo,62 Run0,63 Su,64}6566pub struct ConfigHost {67 config: Config,68 pub name: String,69 pub local: bool,70 pub session: OnceLock<Arc<openssh::Session>>,71 groups: OnceCell<Vec<String>>,7273 pub host_config: Option<Value>,74 pub nixos_config: OnceCell<Value>,75}76impl ConfigHost {77 pub async fn escalation_strategy(&self) -> Result<EscalationStrategy> {78 79 80 if let Ok(_) = self.find_in_path("sudo").await {81 return Ok(EscalationStrategy::Sudo);82 }83 if let Ok(_) = self.find_in_path("run0").await {84 return Ok(EscalationStrategy::Run0);85 }86 Ok(EscalationStrategy::Su)87 }88 89 90 pub async fn tags(&self) -> Result<Vec<String>> {91 if let Some(v) = self.groups.get() {92 return Ok(v.clone());93 }94 let Some(host_config) = &self.host_config else {95 return Ok(vec![]);96 };97 let tags: Vec<String> = nix_go_json!(host_config.tags);9899 let _ = self.groups.set(tags.clone());100101 Ok(tags)102 }103 pub async fn nixos_config(&self) -> Result<Value> {104 if let Some(v) = self.nixos_config.get() {105 return Ok(v.clone());106 }107 let Some(host_config) = &self.host_config else {108 bail!("local host has no nixos_config");109 };110 let nixos_config = nix_go!(host_config.nixos.config);111 assert_warn("nixos config evaluation", &nixos_config).await?;112113 let _ = self.nixos_config.set(nixos_config.clone());114115 Ok(nixos_config)116 }117 async fn open_session(&self) -> Result<Arc<openssh::Session>> {118 assert!(!self.local, "do not open ssh connection to local session");119 120 if let Some(session) = &self.session.get() {121 return Ok((*session).clone());122 };123 let mut session = SessionBuilder::default();124 let session = session125 .connect(&self.name)126 .await127 .map_err(|e| anyhow!("ssh error while connecting to {}: {e}", self.name))?;128 let session = Arc::new(session);129 self.session.set(session.clone()).expect("TOCTOU happened");130 Ok(session)131 }132 pub async fn mktemp_dir(&self) -> Result<String> {133 let mut cmd = self.cmd("mktemp").await?;134 cmd.arg("-d");135 let path = cmd.run_string().await?;136 Ok(path.trim_end().to_owned())137 }138 pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {139 let mut cmd = self.cmd("cat").await?;140 cmd.arg(path);141 cmd.run_bytes().await142 }143 pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {144 let mut cmd = self.cmd("cat").await?;145 cmd.arg(path);146 cmd.run_string().await147 }148 pub async fn read_dir(&self, path: impl AsRef<OsStr>) -> Result<Vec<String>> {149 let mut cmd = self.cmd("ls").await?;150 cmd.arg(path);151 let out = cmd.run_string().await?;152 let mut lines = out.split('\n');153 if let Some(last) = lines.next_back() {154 ensure!(last.is_empty(), "output of ls should end with newline");155 }156 Ok(lines.map(ToOwned::to_owned).collect())157 }158 #[allow(dead_code)]159 pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {160 let text = self.read_file_text(path).await?;161 Ok(serde_json::from_str(&text)?)162 }163 pub async fn read_env(&self, env: &str) -> Result<String> {164 let mut cmd = self.cmd("printenv").await?;165 cmd.arg(env);166 Ok(cmd.run_string().await?)167 }168 pub async fn find_in_path(&self, command: &str) -> Result<String> {169 170 171 172 173 174 175 176 177 178 179 180 181 let mut cmd = self182 .cmd_escalation(183 184 EscalationStrategy::Su,185 "which",186 )187 .await?;188 cmd.arg(command);189 cmd.run_string().await190 }191 pub async fn read_file_value<D: FromStr>(&self, path: impl AsRef<OsStr>) -> Result<D>192 where193 <D as FromStr>::Err: Display,194 {195 let text = self.read_file_text(path).await?;196 D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))197 }198 pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {199 self.cmd_escalation(self.escalation_strategy().await?, cmd)200 .await201 }202 pub async fn cmd_escalation(203 &self,204 escalation: EscalationStrategy,205 cmd: impl AsRef<OsStr>,206 ) -> Result<MyCommand> {207 if self.local {208 Ok(MyCommand::new(escalation, cmd))209 } else {210 let session = self.open_session().await?;211 Ok(MyCommand::new_on(escalation, cmd, session))212 }213 }214215 pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {216 ensure!(data.encrypted, "secret is not encrypted");217 let mut cmd = self.cmd("fleet-install-secrets").await?;218 cmd.arg("decrypt").eqarg("--secret", data.to_string());219 let encoded = cmd220 .sudo()221 .run_string()222 .await223 .context("failed to call remote host for decrypt")?;224 let data: SecretData = encoded.parse().map_err(|e| anyhow!("{e}"))?;225 ensure!(!data.encrypted, "secret came out encrypted");226 Ok(data.data)227 }228 pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {229 ensure!(data.encrypted, "secret is not encrypted");230 let mut cmd = self.cmd("fleet-install-secrets").await?;231 cmd.arg("reencrypt").eqarg("--secret", data.to_string());232 for target in targets {233 let key = self.config.key(&target).await?;234 cmd.eqarg("--targets", key);235 }236 let encoded = cmd237 .sudo()238 .run_string()239 .await240 .context("failed to call remote host for decrypt")?;241 let data: SecretData = encoded.parse().map_err(|e| anyhow!("{e}"))?;242 ensure!(data.encrypted, "secret came out not encrypted");243 Ok(data)244 }245 246 pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {247 if self.local {248 249 return Ok(path.to_owned());250 }251 let mut nix = MyCommand::new(252 253 EscalationStrategy::Su,254 "nix",255 );256 nix.arg("copy")257 .arg("--substitute-on-destination")258 .comparg("--to", format!("ssh-ng://{}", self.name))259 .arg(path);260 nix.run_nix().await.context("nix copy")?;261 Ok(path.to_owned())262 }263 pub async fn systemctl_stop(&self, name: &str) -> Result<()> {264 let mut cmd = self.cmd("systemctl").await?;265 cmd.arg("stop").arg(name);266 cmd.sudo().run().await267 }268 pub async fn systemctl_start(&self, name: &str) -> Result<()> {269 let mut cmd = self.cmd("systemctl").await?;270 cmd.arg("start").arg(name);271 cmd.sudo().run().await272 }273274 pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {275 let mut cmd = self.cmd("rm").await?;276 cmd.arg("-f").arg(path);277 if sudo {278 cmd = cmd.sudo()279 }280 cmd.run().await281 }282283 pub async fn list_configured_secrets(&self) -> Result<Vec<String>> {284 let nixos = self.nixos_config().await?;285 let secrets = nix_go!(nixos.secrets);286 let mut out = Vec::new();287 for name in secrets.list_fields().await? {288 let secret = nix_go!(secrets[{ name }]);289 let is_shared: bool = nix_go_json!(secret.shared);290 if is_shared {291 continue;292 }293 out.push(name);294 }295 Ok(out)296 }297 pub async fn secret_field(&self, name: &str) -> Result<Value> {298 let nixos = self.nixos_config().await?;299 Ok(nix_go!(nixos.secrets[{ name }]))300 }301302 303 pub async fn pkgs(&self) -> Result<Value> {304 let Some(host_config) = &self.host_config else {305 bail!("local host has no host_config");306 };307 308 Ok(nix_go!(host_config.nixos.options._module.args.value.pkgs))309 }310}311312impl Config {313 pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {314 if !self.opts.skip.is_empty() && self.opts.skip.iter().any(|h| h as &str == host.name) {315 return Ok(true);316 }317 if self.opts.only.is_empty() {318 return Ok(false);319 }320 let mut have_group_matches = false;321 for item in self.opts.only.iter() {322 match item {323 HostItem::Host { name, .. } if *name == host.name => {324 return Ok(false);325 }326 HostItem::Tag { .. } => {327 have_group_matches = true;328 }329 _ => {}330 }331 }332 if have_group_matches {333 let host_tags = host.tags().await?;334 for item in self.opts.only.iter() {335 match item {336 HostItem::Tag { name, .. } if host_tags.contains(name) => {337 return Ok(false);338 }339 _ => {}340 }341 }342 }343 Ok(true)344 }345 pub async fn action_attr(&self, host: &ConfigHost, attr: &str) -> Result<Option<String>> {346 if self.opts.only.is_empty() {347 return Ok(None);348 }349 let mut have_group_matches = false;350 for item in self.opts.only.iter() {351 match item {352 HostItem::Host { name, attrs }353 if *name == host.name && attrs.contains_key(attr) =>354 {355 return Ok(attrs.get(attr).cloned());356 }357 HostItem::Tag { attrs, .. } if attrs.contains_key(attr) => {358 have_group_matches = true;359 }360 _ => {}361 }362 }363 if have_group_matches {364 let host_tags = host.tags().await?;365 for item in self.opts.only.iter() {366 match item {367 HostItem::Tag { name, attrs }368 if host_tags.contains(name) && attrs.contains_key(attr) =>369 {370 return Ok(attrs.get(attr).cloned());371 }372 _ => {}373 }374 }375 }376 Ok(None)377 }378 pub fn is_local(&self, host: &str) -> bool {379 self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)380 }381382 pub fn local_host(&self) -> ConfigHost {383 ConfigHost {384 config: self.clone(),385 name: "<virtual localhost>".to_owned(),386 local: true,387 session: OnceLock::new(),388 host_config: None,389 nixos_config: OnceCell::new(),390 groups: {391 let cell = OnceCell::new();392 let _ = cell.set(vec![]);393 cell394 },395 }396 }397398 pub async fn host(&self, name: &str) -> Result<ConfigHost> {399 let config = &self.config_field;400 let host_config = nix_go!(config.hosts[{ name }]);401402 Ok(ConfigHost {403 config: self.clone(),404 name: name.to_owned(),405 local: self.is_local(name),406 session: OnceLock::new(),407 host_config: Some(host_config),408 nixos_config: OnceCell::new(),409 groups: OnceCell::new(),410 })411 }412 pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {413 let config = &self.config_field;414 let names = nix_go!(config.hosts).list_fields().await?;415 let mut out = vec![];416 for name in names {417 out.push(self.host(&name).await?);418 }419 Ok(out)420 }421 pub async fn system_config(&self, host: &str) -> Result<Value> {422 let fleet_field = &self.config_field;423 Ok(nix_go!(fleet_field.hosts[{ host }].nixos.config))424 }425426 pub(super) fn data(&self) -> MutexGuard<FleetData> {427 self.data.lock().unwrap()428 }429 pub(super) fn data_mut(&self) -> MutexGuard<FleetData> {430 self.data.lock().unwrap()431 }432 433 pub async fn list_configured_shared(&self) -> Result<Vec<String>> {434 let config_field = &self.config_field;435 Ok(nix_go!(config_field.sharedSecrets).list_fields().await?)436 }437 438 pub fn list_shared(&self) -> Vec<String> {439 let data = self.data();440 data.shared_secrets.keys().cloned().collect()441 }442 pub fn has_shared(&self, name: &str) -> bool {443 let data = self.data();444 data.shared_secrets.contains_key(name)445 }446 pub fn replace_shared(&self, name: String, shared: FleetSharedSecret) {447 let mut data = self.data_mut();448 data.shared_secrets.insert(name.to_owned(), shared);449 }450 pub fn remove_shared(&self, secret: &str) {451 let mut data = self.data_mut();452 data.shared_secrets.remove(secret);453 }454455 pub fn list_secrets(&self, host: &str) -> Vec<String> {456 let data = self.data();457 let Some(secrets) = data.host_secrets.get(host) else {458 return Vec::new();459 };460 secrets.keys().cloned().collect()461 }462463 pub fn has_secret(&self, host: &str, secret: &str) -> bool {464 let data = self.data();465 let Some(host_secrets) = data.host_secrets.get(host) else {466 return false;467 };468 host_secrets.contains_key(secret)469 }470 pub fn insert_secret(&self, host: &str, secret: String, value: FleetSecret) {471 let mut data = self.data_mut();472 let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();473 host_secrets.insert(secret, value);474 }475476 pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {477 let data = self.data();478 let Some(host_secrets) = data.host_secrets.get(host) else {479 bail!("no secrets for machine {host}");480 };481 let Some(secret) = host_secrets.get(secret) else {482 bail!("machine {host} has no secret {secret}");483 };484 Ok(secret.clone())485 }486 pub fn shared_secret(&self, secret: &str) -> Result<FleetSharedSecret> {487 let data = self.data();488 let Some(secret) = data.shared_secrets.get(secret) else {489 bail!("no shared secret {secret}");490 };491 Ok(secret.clone())492 }493 pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {494 let config_field = &self.config_field;495 Ok(nix_go_json!(496 config_field.sharedSecrets[{ secret }].expectedOwners497 ))498 }499500 pub fn save(&self) -> Result<()> {501 let mut tempfile = NamedTempFile::new_in(self.directory.clone()).context("failed to create updated version of fleet.nix in the same directory as original.\nDo you have write access to it? Access only to the fleet.nix won't be enough, the directory is used for atomic overwrite operation.\nIt is not recommended to use fleet by root anyway, move fleet project to your home directory.")?;502 let data = nixlike::serialize(&self.data() as &FleetData)?;503 tempfile.write_all(504 format!(505 "# This file contains fleet state and shouldn't be edited by hand\n\n{}\n\n# vim: ts=2 et nowrap\n",506 data507 )508 .as_bytes(),509 )?;510 let mut fleet_data_path = self.directory.clone();511 fleet_data_path.push("fleet.nix");512 tempfile.persist(fleet_data_path)?;513 Ok(())514 }515}516517#[derive(Clone)]518enum HostItem {519 Host {520 name: String,521 attrs: BTreeMap<String, String>,522 },523 Tag {524 name: String,525 attrs: BTreeMap<String, String>,526 },527}528fn host_item_parser(input: &str) -> Result<HostItem, String> {529 fn err_to_string(err: nom::Err<nom::error::Error<&str>>) -> String {530 err.to_string()531 }532533 let (input, is_tag) = map(opt(char('@')), |c| c.is_some())(input).map_err(err_to_string)?;534 let (input, name) = map(535 take_while1(|v| v != ',' && v != '?' && v != '@'),536 str::to_owned,537 )(input)538 .map_err(err_to_string)?;539540 let kw_item = separated_pair(541 map(take_while1(|v| v != '&' && v != '='), str::to_owned),542 char('='),543 map(take_while1(|v| v != '&'), str::to_owned),544 );545 let kw = map(separated_list1(char('&'), kw_item), |vec| {546 vec.into_iter().collect::<BTreeMap<_, _>>()547 });548 let mut opt_kw = map(opt(preceded(char('?'), kw)), Option::unwrap_or_default);549550 let (input, attrs) = opt_kw(input).map_err(err_to_string)?;551552 if !input.is_empty() {553 return Err(format!("unexpected trailing input: {input:?}"));554 }555 Ok(if is_tag {556 HostItem::Tag { name, attrs }557 } else {558 HostItem::Host { name, attrs }559 })560}561562#[derive(Parser, Clone)]563pub struct FleetOpts {564 565 #[clap(long, number_of_values = 1, value_parser = host_item_parser)]566 only: Vec<HostItem>,567568 569 #[clap(long, number_of_values = 1)]570 skip: Vec<String>,571572 573 #[clap(long)]574 pub localhost: Option<String>,575576 577 578 #[clap(long, default_value = "detect")]579 pub local_system: String,580}581582impl FleetOpts {583 pub async fn build(mut self, nix_args: Vec<OsString>) -> Result<Config> {584 if self.localhost.is_none() {585 self.localhost586 .replace(hostname::get().unwrap().to_str().unwrap().to_owned());587 }588 let directory = current_dir()?;589590 let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;591 let root_field = pool.get().await?;592593 let builtins_field = Value::binding(root_field.clone(), "builtins").await?;594 if self.local_system == "detect" {595 self.local_system = nix_go_json!(builtins_field.currentSystem);596 }597 let local_system = self.local_system.clone();598599 let mut fleet_data_path = directory.clone();600 fleet_data_path.push("fleet.nix");601 let bytes = std::fs::read_to_string(fleet_data_path)?;602 let data: Mutex<FleetData> = nixlike::parse_str(&bytes)?;603604 let fleet_root = Value::binding(root_field, "fleetConfigurations").await?;605 let fleet_field = nix_go!(fleet_root.default({ data }));606607 let config_field = nix_go!(fleet_field.config);608609 assert_warn("fleet config evaluation", &config_field).await?;610611 let import = nix_go!(builtins_field.import);612 let overlays = nix_go!(config_field.nixpkgs.overlays);613 let nixpkgs = nix_go!(fleet_field.nixpkgs.buildUsing | import);614615 let default_pkgs = nix_go!(nixpkgs(Obj {616 overlays,617 system: { self.local_system.clone() },618 }));619620 Ok(Config(Arc::new(FleetConfigInternals {621 opts: self,622 directory,623 data,624 local_system,625 nix_args,626 config_field,627 default_pkgs,628 })))629 }630}